You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/25 00:44:43 UTC

svn commit: r1545114 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Author: cheolsoo
Date: Sun Nov 24 23:44:43 2013
New Revision: 1545114

URL: http://svn.apache.org/r1545114
Log:
PIG-3285: Jobs using HBaseStorage fail to ship dependency jars (ndimiduk via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1545114&r1=1545113&r2=1545114&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 24 23:44:43 2013
@@ -54,6 +54,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3285: Jobs using HBaseStorage fail to ship dependency jars (ndimiduk via cheolsoo)
+
 PIG-3582: Document SUM, MIN, MAX, and AVG functions for BigInteger and BigDecimal (harichinnan via cheolsoo)
 
 PIG-3576: NPE due to PIG-3549 when job never gets submitted (lbendig via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1545114&r1=1545113&r2=1545114&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Nov 24 23:44:43 2013
@@ -21,6 +21,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
@@ -700,7 +701,7 @@ public class HBaseStorage extends LoadFu
         Properties udfProps = getUDFProperties();
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
 
-        initialiseHBaseClassLoaderResources(job);
+        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
         if (delegationTokenSet == null) {
@@ -729,19 +730,38 @@ public class HBaseStorage extends LoadFu
         }
     }
 
-    private void initialiseHBaseClassLoaderResources(Job job) throws IOException {
+    private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+        // Depend on HBase to do the right thing when available, as of HBASE-9165
+        try {
+            Method addHBaseDependencyJars =
+              TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
+            if (addHBaseDependencyJars != null) {
+                addHBaseDependencyJars.invoke(null, job.getConfiguration());
+                return;
+            }
+        } catch (NoSuchMethodException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
+              + " Falling back to previous logic.", e);
+        } catch (IllegalAccessException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+              + " not permitted. Falling back to previous logic.", e);
+        } catch (InvocationTargetException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+              + " failed. Falling back to previous logic.", e);
+        }
+        // fall back to manual class handling.
         // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
             org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
             org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
-            com.google.common.collect.Lists.class,
-            org.apache.zookeeper.ZooKeeper.class);
-
-          // Additional jars that are specific to only some HBase versions
-          // HBase 0.95+
-          addClassToJobIfExists(job, "org.cloudera.htrace.Trace");
-          addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
-          addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
+            com.google.common.collect.Lists.class, // guava
+            org.apache.zookeeper.ZooKeeper.class); // zookeeper
+        // Additional jars that are specific to v0.95.0+
+        addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
+        addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
+        addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
+        addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
+        addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
     }
 
     private void addClassToJobIfExists(Job job, String className) throws IOException {
@@ -894,7 +914,7 @@ public class HBaseStorage extends LoadFu
             if (LOG.isDebugEnabled()) {
                 LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
                         ", cf:column=" + columnInfo);
-        }
+            }
 
             if (!columnInfo.isColumnMap()) {
                 put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
@@ -993,7 +1013,7 @@ public class HBaseStorage extends LoadFu
             schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
         }
 
-        initialiseHBaseClassLoaderResources(job);
+        initializeHBaseClassLoaderResources(job);
         m_conf = initializeLocalJobConfig(job);
         // Not setting a udf property and getting the hbase delegation token
         // only once like in setLocation as setStoreLocation gets different Job
@@ -1060,7 +1080,7 @@ public class HBaseStorage extends LoadFu
                 ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
                 loadRowKey_ = false;
             projOffset = 0;
-            }
+        }
 
         for (int i = projOffset; i < requiredFields.size(); i++) {
             int fieldIndex = requiredFields.get(i).getIndex();
@@ -1071,7 +1091,7 @@ public class HBaseStorage extends LoadFu
             LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
             for (ColumnInfo colInfo : newColumns) {
                 LOG.debug("pushProjection -- col: " + colInfo);
-        }
+            }
         }
         setColumnInfoList(newColumns);
         return new RequiredFieldResponse(true);
@@ -1086,7 +1106,7 @@ public class HBaseStorage extends LoadFu
             @Override
             public void readFields(DataInput in) throws IOException {
                 tsplit.readFields(in);
-}
+            }
 
             @Override
             public void write(DataOutput out) throws IOException {