You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/23 22:48:43 UTC
svn commit: r1633945 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
test/org/apache/pig/test/TestJobControlCompiler.java
Author: rohini
Date: Thu Oct 23 20:48:42 2014
New Revision: 1633945
URL: http://svn.apache.org/r1633945
Log:
PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 23 20:48:42 2014
@@ -34,6 +34,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)
+
PIG-4039: New interface for resetting static variables for jvm reuse (rohini)
PIG-3870: STRSPLITTOBAG UDF (cryptoe via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Oct 23 20:48:42 2014
@@ -1651,10 +1651,48 @@ public class JobControlCompiler{
// Turn on the symlink feature
DistributedCache.createSymlink(conf);
- // REGISTER always copies locally the jar file. see PigServer.registerJar()
- Path pathInHDFS = shipToHDFS(pigContext, conf, url);
- // and add to the DistributedCache
- DistributedCache.addFileToClassPath(pathInHDFS, conf);
+ Path distCachePath = getExistingDistCacheFilePath(conf, url);
+ if (distCachePath != null) {
+ // Path already in dist cache
+ if (!HadoopShims.isHadoopYARN()) {
+ // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+ // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+ // But path may only be in 'mapred.cache.files' and not be in
+ // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+ DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+ }
+ }
+ else {
+ // REGISTER always copies locally the jar file. see PigServer.registerJar()
+ Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+ // and add to the DistributedCache
+ DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+ }
+
+ }
+
+ private static Path getExistingDistCacheFilePath(Configuration conf, URL url) throws IOException {
+ URI[] cacheFileUris = DistributedCache.getCacheFiles(conf);
+ if (cacheFileUris != null) {
+ String fileName = url.getRef() == null ? FilenameUtils.getName(url.getPath()) : url.getRef();
+ for (URI cacheFileUri : cacheFileUris) {
+ Path path = new Path(cacheFileUri);
+ String cacheFileName = cacheFileUri.getFragment() == null ? path.getName() : cacheFileUri.getFragment();
+ // Match
+ // - if both filenames are same and no symlinks (or)
+ // - if both symlinks are same (or)
+ // - symlink of existing cache file is same as the name of the new file to be added.
+ // That would be the case when hbase-0.98.4.jar#hbase.jar is configured via Oozie
+ // and register hbase.jar is done in the pig script.
+ // If two different files are symlinked to the same name, then there is a conflict
+ // and hadoop itself does not guarantee which file will be symlinked to that name.
+ // So we are good.
+ if (fileName.equals(cacheFileName)) {
+ return path;
+ }
+ }
+ }
+ return null;
}
private static Path getCacheStagingDir(Configuration conf) throws IOException {
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Thu Oct 23 20:48:42 2014
@@ -31,8 +31,10 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
@@ -47,6 +49,7 @@ import javax.tools.ToolProvider;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -55,11 +58,12 @@ import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -72,7 +76,7 @@ public class TestJobControlCompiler {
private static final Configuration CONF = new Configuration();
-
+
@BeforeClass
public static void setupClass() throws Exception {
// creating a hadoop-site.xml and making it visible to Pig
@@ -130,9 +134,9 @@ public class TestJobControlCompiler {
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
- Assert.assertTrue("starts with /: "+distributedCachePath,
+ Assert.assertTrue("starts with /: "+distributedCachePath,
distributedCachePath.toString().startsWith("/"));
- Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
+ Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
}
@@ -171,15 +175,9 @@ public class TestJobControlCompiler {
StringUtils.join(zipArchives, ","));
pigContext.getProperties().put("pig.streaming.cache.files",
StringUtils.join(tarArchives, ","));
- final JobControlCompiler jobControlCompiler = new JobControlCompiler(
- pigContext, CONF);
- final MROperPlan plan = new MROperPlan();
- plan.add(new MapReduceOper(new OperatorKey()));
+ final JobConf jobConf = compileTestJob(pigContext, CONF);
- final JobControl jobControl = jobControlCompiler.compile(plan, "test");
- final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
-
URI[] uris = DistributedCache.getCacheFiles(jobConf);
int sizeTxt = 0;
for (int i = 0; i < uris.length; i++) {
@@ -193,6 +191,95 @@ public class TestJobControlCompiler {
".tar.gz", ".tar");
}
+ private JobConf compileTestJob(final PigContext pigContext, Configuration conf)
+ throws JobCreationException {
+ final JobControlCompiler jobControlCompiler = new JobControlCompiler(
+ pigContext, conf);
+
+ final MROperPlan plan = new MROperPlan();
+ plan.add(new MapReduceOper(new OperatorKey()));
+
+ final JobControl jobControl = jobControlCompiler.compile(plan, "test");
+ final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
+ return jobConf;
+ }
+
+ /**
+ * Tests that no duplicate jars are added to distributed cache, which might cause conflicts
+ * and tests with both symlinked and normal jar specification
+ */
+ @Test
+ public void testNoDuplicateJarsInDistributedCache() throws Exception {
+
+ // JobControlCompiler setup
+ final PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+ PigContext pigContext = pigServer.getPigContext();
+ pigContext.connect();
+
+ Configuration conf = new Configuration();
+ DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf-0.jar#udf.jar")), conf, FileSystem.get(conf));
+ DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf1.jar#diffname.jar")), conf, FileSystem.get(conf));
+ DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf2.jar")), conf, FileSystem.get(conf));
+ createAndAddResource("udf.jar", pigContext);
+ createAndAddResource("udf1.jar", pigContext);
+ createAndAddResource("udf2.jar", pigContext);
+ createAndAddResource("another.jar", pigContext);
+
+ final JobConf jobConf = compileTestJob(pigContext, conf);
+
+ // verifying the jar gets on distributed cache
+ URI[] cacheURIs = DistributedCache.getCacheFiles(jobConf);
+ Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
+ // expected - 1. udf.jar#udf.jar, 2. udf1.jar#diffname.jar 3. udf2.jar (same added twice)
+ // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
+ System.out.println("cache.files= " + Arrays.toString(cacheURIs));
+ System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
+ if (HadoopShims.isHadoopYARN()) {
+ // Default jars - 5 (pig, antlr, joda-time, guava, automaton)
+ // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+ Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 10 for " + Arrays.toString(fileClassPaths), 10,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+ } else {
+ // There will be same entries duplicated for udf.jar and udf2.jar
+ Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+ }
+
+ // Count occurrences of the resources
+ Map<String, Integer> occurrences = new HashMap<String, Integer>();
+
+ for (URI cacheURI : cacheURIs) {
+ Integer val = occurrences.get(cacheURI.toString());
+ val = (val == null) ? 1 : ++val;
+ occurrences.put(cacheURI.toString(), val);
+ }
+ Assert.assertEquals(10, occurrences.size());
+
+ for (String file : occurrences.keySet()) {
+ if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
+ // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
+ // We assert path is same by checking count
+ Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
+ } else {
+ // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+ // and second time through pig register jar when there is symlink
+ Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+ }
+ }
+ }
+
+ private File createAndAddResource(String name, PigContext pigContext) throws IOException {
+ File f = new File(name);
+ f.createNewFile();
+ f.deleteOnExit();
+ pigContext.addJar(name);
+ return f;
+ }
+
@Test
public void testEstimateNumberOfReducers() throws Exception {
Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
@@ -229,7 +316,7 @@ public class TestJobControlCompiler {
}
/**
- * checks if the given file name is in the jar
+ * checks if the given file name is in the jar
* @param jarFile the jar to check
* @param name the name to find (full path in the jar)
* @return true if the name was found