You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/23 22:48:43 UTC

svn commit: r1633945 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java test/org/apache/pig/test/TestJobControlCompiler.java

Author: rohini
Date: Thu Oct 23 20:48:42 2014
New Revision: 1633945

URL: http://svn.apache.org/r1633945
Log:
PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 23 20:48:42 2014
@@ -34,6 +34,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)
+
 PIG-4039: New interface for resetting static variables for jvm reuse (rohini)
 
 PIG-3870: STRSPLITTOBAG UDF (cryptoe via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Oct 23 20:48:42 2014
@@ -1651,10 +1651,48 @@ public class JobControlCompiler{
         // Turn on the symlink feature
         DistributedCache.createSymlink(conf);
 
-        // REGISTER always copies locally the jar file. see PigServer.registerJar()
-        Path pathInHDFS = shipToHDFS(pigContext, conf, url);
-        // and add to the DistributedCache
-        DistributedCache.addFileToClassPath(pathInHDFS, conf);
+        Path distCachePath = getExistingDistCacheFilePath(conf, url);
+        if (distCachePath != null) {
+            // Path already in dist cache
+            if (!HadoopShims.isHadoopYARN()) {
+                // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+                // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+                // But path may only be in 'mapred.cache.files' and not be in
+                // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+                DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+            }
+        }
+        else {
+            // REGISTER always copies locally the jar file. see PigServer.registerJar()
+            Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+            // and add to the DistributedCache
+            DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+        }
+
+    }
+
+    private static Path getExistingDistCacheFilePath(Configuration conf, URL url) throws IOException {
+        URI[] cacheFileUris = DistributedCache.getCacheFiles(conf);
+        if (cacheFileUris != null) {
+            String fileName = url.getRef() == null ? FilenameUtils.getName(url.getPath()) : url.getRef();
+            for (URI cacheFileUri : cacheFileUris) {
+                Path path = new Path(cacheFileUri);
+                String cacheFileName = cacheFileUri.getFragment() == null ? path.getName() : cacheFileUri.getFragment();
+                // Match
+                //     - if both filenames are same and no symlinks (or)
+                //     - if both symlinks are same (or)
+                //     - symlink of existing cache file is same as the name of the new file to be added.
+                //         That would be the case when hbase-0.98.4.jar#hbase.jar is configured via Oozie
+                // and register hbase.jar is done in the pig script.
+                // If two different files are symlinked to the same name, then there is a conflict
+                // and hadoop itself does not guarantee which file will be symlinked to that name.
+                // So we are good.
+                if (fileName.equals(cacheFileName)) {
+                    return path;
+                }
+            }
+        }
+        return null;
     }
 
     private static Path getCacheStagingDir(Configuration conf) throws IOException {

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1633945&r1=1633944&r2=1633945&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Thu Oct 23 20:48:42 2014
@@ -31,8 +31,10 @@ import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.JarOutputStream;
@@ -47,6 +49,7 @@ import javax.tools.ToolProvider;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -55,11 +58,12 @@ import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -72,7 +76,7 @@ public class TestJobControlCompiler {
 
     private static final Configuration CONF = new Configuration();
 
-    
+
     @BeforeClass
     public static void setupClass() throws Exception {
         // creating a hadoop-site.xml and making it visible to Pig
@@ -130,9 +134,9 @@ public class TestJobControlCompiler {
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
-    Assert.assertTrue("starts with /: "+distributedCachePath, 
+    Assert.assertTrue("starts with /: "+distributedCachePath,
         distributedCachePath.toString().startsWith("/"));
-    Assert.assertTrue("jar pushed to distributed cache should contain testUDF", 
+    Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
         jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
   }
 
@@ -171,15 +175,9 @@ public class TestJobControlCompiler {
                 StringUtils.join(zipArchives, ","));
         pigContext.getProperties().put("pig.streaming.cache.files",
                 StringUtils.join(tarArchives, ","));
-        final JobControlCompiler jobControlCompiler = new JobControlCompiler(
-                pigContext, CONF);
 
-        final MROperPlan plan = new MROperPlan();
-        plan.add(new MapReduceOper(new OperatorKey()));
+        final JobConf jobConf = compileTestJob(pigContext, CONF);
 
-        final JobControl jobControl = jobControlCompiler.compile(plan, "test");
-        final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
-        
         URI[] uris = DistributedCache.getCacheFiles(jobConf);
         int sizeTxt = 0;
         for (int i = 0; i < uris.length; i++) {
@@ -193,6 +191,95 @@ public class TestJobControlCompiler {
                 ".tar.gz", ".tar");
     }
 
+    private JobConf compileTestJob(final PigContext pigContext, Configuration conf)
+            throws JobCreationException {
+        final JobControlCompiler jobControlCompiler = new JobControlCompiler(
+                pigContext, conf);
+
+        final MROperPlan plan = new MROperPlan();
+        plan.add(new MapReduceOper(new OperatorKey()));
+
+        final JobControl jobControl = jobControlCompiler.compile(plan, "test");
+        final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
+        return jobConf;
+    }
+
+    /**
+     * Tests that no duplicate jars are added to distributed cache, which might cause conflicts
+     * and tests with both symlinked and normal jar specification
+     */
+      @Test
+      public void testNoDuplicateJarsInDistributedCache() throws Exception {
+
+          // JobControlCompiler setup
+          final PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+          PigContext pigContext = pigServer.getPigContext();
+          pigContext.connect();
+
+          Configuration conf = new Configuration();
+          DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf-0.jar#udf.jar")), conf, FileSystem.get(conf));
+          DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf1.jar#diffname.jar")), conf, FileSystem.get(conf));
+          DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf2.jar")), conf, FileSystem.get(conf));
+          createAndAddResource("udf.jar", pigContext);
+          createAndAddResource("udf1.jar", pigContext);
+          createAndAddResource("udf2.jar", pigContext);
+          createAndAddResource("another.jar", pigContext);
+
+          final JobConf jobConf = compileTestJob(pigContext, conf);
+
+          // verifying the jar gets on distributed cache
+          URI[] cacheURIs = DistributedCache.getCacheFiles(jobConf);
+          Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
+          // expected - 1. udf.jar#udf.jar, 2. udf1.jar#diffname.jar 3. udf2.jar (same added twice)
+          // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
+          System.out.println("cache.files= " + Arrays.toString(cacheURIs));
+          System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
+          if (HadoopShims.isHadoopYARN()) {
+              // Default jars - 5 (pig, antlr, joda-time, guava, automaton)
+              // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+              Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10,
+                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+              Assert.assertEquals("size 10 for " + Arrays.toString(fileClassPaths), 10,
+                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+          } else {
+              // There will be same entries duplicated for udf.jar and udf2.jar
+              Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
+                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+              Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
+                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
+          }
+
+          // Count occurrences of the resources
+          Map<String, Integer> occurrences = new HashMap<String, Integer>();
+
+          for (URI cacheURI : cacheURIs) {
+              Integer val = occurrences.get(cacheURI.toString());
+              val = (val == null) ? 1 : ++val;
+              occurrences.put(cacheURI.toString(), val);
+          }
+          Assert.assertEquals(10, occurrences.size());
+
+          for (String file : occurrences.keySet()) {
+              if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
+                  // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
+                  // We assert path is same by checking count
+                  Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
+              } else {
+                  // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+                  // and second time through pig register jar when there is symlink
+                  Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
+              }
+          }
+      }
+
+      private File createAndAddResource(String name, PigContext pigContext) throws IOException {
+          File f = new File(name);
+          f.createNewFile();
+          f.deleteOnExit();
+          pigContext.addJar(name);
+          return f;
+      }
+
     @Test
     public void testEstimateNumberOfReducers() throws Exception {
         Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
@@ -229,7 +316,7 @@ public class TestJobControlCompiler {
     }
 
   /**
-   * checks if the given file name is in the jar 
+   * checks if the given file name is in the jar
    * @param jarFile the jar to check
    * @param name the name to find (full path in the jar)
    * @return true if the name was found