You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2022/01/11 21:29:22 UTC

svn commit: r1896929 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Author: knoguchi
Date: Tue Jan 11 21:29:22 2022
New Revision: 1896929

URL: http://svn.apache.org/viewvc?rev=1896929&view=rev
Log:
PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1896929&r1=1896928&r2=1896929&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 11 21:29:22 2022
@@ -104,6 +104,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)
+
 PIG-5415: [spark] TestScriptLanguage conflict between multiple SparkContext (after spark2.4 upgrade) (knoguchi)
 
 PIG-5412: testSkewedJoinOuter spark unit-test failing with ClassNotFoundException (knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1896929&r1=1896928&r2=1896929&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Jan 11 21:29:22 2022
@@ -165,6 +165,9 @@ public class SparkLauncher extends Launc
     private SparkEngineConf sparkEngineConf = new SparkEngineConf();
     private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName();
 
+    // this set is unnecessary once PIG-5241 is fixed
+    private static Set<String> allCachedFiles = null;
+
     @Override
     public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
                               PigContext pigContext) throws Exception {
@@ -418,8 +421,11 @@ public class SparkLauncher extends Launc
                         fs.copyToLocalFile(src, tmpFilePath);
                         tmpFile.deleteOnExit();
                         LOG.info(String.format("CacheFile:%s", fileName));
-                        addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
-                                ResourceType.FILE);
+                        if(!allCachedFiles.contains(file.trim())) {
+                            allCachedFiles.add(file.trim());
+                            addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
+                                     ResourceType.FILE);
+                        }
                     }
                 }
             }
@@ -641,6 +647,7 @@ public class SparkLauncher extends Launc
             sparkContext = new JavaSparkContext(sparkConf);
             SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener());
             SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener());
+            allCachedFiles = new HashSet<String>();
         }
         jobConf.set(SPARK_VERSION, sparkContext.version());
     }