You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2022/01/11 21:29:22 UTC
svn commit: r1896929 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Author: knoguchi
Date: Tue Jan 11 21:29:22 2022
New Revision: 1896929
URL: http://svn.apache.org/viewvc?rev=1896929&view=rev
Log:
PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1896929&r1=1896928&r2=1896929&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 11 21:29:22 2022
@@ -104,6 +104,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)
+
PIG-5415: [spark] TestScriptLanguage conflict between multiple SparkContext (after spark2.4 upgrade) (knoguchi)
PIG-5412: testSkewedJoinOuter spark unit-test failing with ClassNotFoundException (knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1896929&r1=1896928&r2=1896929&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Jan 11 21:29:22 2022
@@ -165,6 +165,9 @@ public class SparkLauncher extends Launc
private SparkEngineConf sparkEngineConf = new SparkEngineConf();
private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName();
+ // this set is unnecessary once PIG-5241 is fixed
+ private static Set<String> allCachedFiles = null;
+
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
@@ -418,8 +421,11 @@ public class SparkLauncher extends Launc
fs.copyToLocalFile(src, tmpFilePath);
tmpFile.deleteOnExit();
LOG.info(String.format("CacheFile:%s", fileName));
- addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
- ResourceType.FILE);
+ if(!allCachedFiles.contains(file.trim())) {
+ allCachedFiles.add(file.trim());
+ addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
+ ResourceType.FILE);
+ }
}
}
}
@@ -641,6 +647,7 @@ public class SparkLauncher extends Launc
sparkContext = new JavaSparkContext(sparkConf);
SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener());
SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener());
+ allCachedFiles = new HashSet<String>();
}
jobConf.set(SPARK_VERSION, sparkContext.version());
}