You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/24 17:09:12 UTC

svn commit: r1634073 - in /pig/branches/branch-0.14: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java src/org/apache/pig/builtin/FuncUtils.java

Author: rohini
Date: Fri Oct 24 15:09:11 2014
New Revision: 1634073

URL: http://svn.apache.org/r1634073
Log:
PIG-4246: HBaseStorage should implement getShipFiles (rohini)

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 24 15:09:11 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4246: HBaseStorage should implement getShipFiles (rohini)
+
 PIG-3456: Reduce threadlocal conf access in backend for each record (rohini)
 
 PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 24 15:09:11 2014
@@ -639,7 +639,6 @@ public class JobControlCompiler{
                             }
                         }
                         if (!predeployed) {
-                            log.info("Adding jar to DistributedCache: " + jar);
                             putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
                         }
                     }
@@ -1653,6 +1652,8 @@ public class JobControlCompiler{
 
         Path distCachePath = getExistingDistCacheFilePath(conf, url);
         if (distCachePath != null) {
+            log.info("Jar file " + url + " already in DistributedCache as "
+                    + distCachePath + ". Not copying to hdfs and adding again");
             // Path already in dist cache
             if (!HadoopShims.isHadoopYARN()) {
                 // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
@@ -1665,8 +1666,8 @@ public class JobControlCompiler{
         else {
             // REGISTER always copies locally the jar file. see PigServer.registerJar()
             Path pathInHDFS = shipToHDFS(pigContext, conf, url);
-            // and add to the DistributedCache
             DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+            log.info("Added jar " + url + " to DistributedCache through " + pathInHDFS);
         }
 
     }

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Oct 24 15:09:11 2014
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -82,8 +84,10 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -135,7 +139,7 @@ import com.google.common.collect.Lists;
  * <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
  *
  */
-public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources {
 
     private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
 
@@ -317,7 +321,7 @@ public class HBaseStorage extends LoadFu
 			if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
 				noWAL_ = true;
 			}
-		}        
+		}
 
         if (configuredOptions_.hasOption("minTimestamp")){
             minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
@@ -719,7 +723,6 @@ public class HBaseStorage extends LoadFu
         Properties udfProps = getUDFProperties();
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
 
-        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
         if (delegationTokenSet == null) {
@@ -748,14 +751,23 @@ public class HBaseStorage extends LoadFu
         }
     }
 
-    private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+    @Override
+    public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
             Method addHBaseDependencyJars =
               TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
             if (addHBaseDependencyJars != null) {
-                addHBaseDependencyJars.invoke(null, job.getConfiguration());
-                return;
+                Configuration conf = new Configuration();
+                addHBaseDependencyJars.invoke(null, conf);
+                if (conf.get("tmpjars") != null) {
+                    String[] tmpjars = conf.getStrings("tmpjars");
+                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                    for (String tmpjar : tmpjars) {
+                        shipFiles.add(new URL(tmpjar).getPath());
+                    }
+                    return shipFiles;
+                }
             }
         } catch (NoSuchMethodException e) {
             LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
@@ -766,32 +778,32 @@ public class HBaseStorage extends LoadFu
         } catch (InvocationTargetException e) {
             LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
               + " failed. Falling back to previous logic.", e);
+        } catch (MalformedURLException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                    + " had malformed url. Falling back to previous logic.", e);
         }
-        // fall back to manual class handling.
-        // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
-        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-            org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
-            org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
-            com.google.common.collect.Lists.class, // guava
-            org.apache.zookeeper.ZooKeeper.class); // zookeeper
+
+        List<Class> classList = new ArrayList<Class>();
+        classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
+        classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+        classList.add(com.google.common.collect.Lists.class); // guava
+        classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
-        addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
-        addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
-        addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
-    }
-
-    private void addClassToJobIfExists(Job job, String className) throws IOException {
-      Class klass = null;
-      try {
-          klass = Class.forName(className);
-      } catch (ClassNotFoundException e) {
-          LOG.debug("Skipping adding jar for class: " + className);
-          return;
-      }
+        addClassToList("org.cloudera.htrace.Trace", classList); // htrace
+        addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
+        addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+        addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
+        return FuncUtils.getShipFiles(classList);
+    }
 
-      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass);
+    private void addClassToList(String className, List<Class> classList) {
+        try {
+            Class klass = Class.forName(className);
+            classList.add(klass);
+        } catch (ClassNotFoundException e) {
+            LOG.debug("Skipping adding jar for class: " + className);
+        }
     }
 
     private JobConf initializeLocalJobConfig(Job job) {
@@ -1035,7 +1047,6 @@ public class HBaseStorage extends LoadFu
             schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
         }
 
-        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         // Not setting a udf property and getting the hbase delegation token
         // only once like in setLocation as setStoreLocation gets different Job

Modified: pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java Fri Oct 24 15:09:11 2014
@@ -18,6 +18,7 @@
 package org.apache.pig.builtin;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.pig.impl.util.JarManager;
@@ -29,6 +30,10 @@ public class FuncUtils {
      * @return list of containing jars
      */
     public static List<String> getShipFiles(Class[] classesIdentifyingJars) {
+        return getShipFiles(Arrays.asList(classesIdentifyingJars));
+    }
+
+    public static List<String> getShipFiles(List<Class> classesIdentifyingJars) {
         List<String> cacheFiles = new ArrayList<String>();
 
         for (Class clz : classesIdentifyingJars) {