You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/25 00:44:43 UTC
svn commit: r1545114 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Author: cheolsoo
Date: Sun Nov 24 23:44:43 2013
New Revision: 1545114
URL: http://svn.apache.org/r1545114
Log:
PIG-3285: Jobs using HBaseStorage fail to ship dependency jars (ndimiduk via cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1545114&r1=1545113&r2=1545114&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 24 23:44:43 2013
@@ -54,6 +54,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3285: Jobs using HBaseStorage fail to ship dependency jars (ndimiduk via cheolsoo)
+
PIG-3582: Document SUM, MIN, MAX, and AVG functions for BigInteger and BigDecimal (harichinnan via cheolsoo)
PIG-3576: NPE due to PIG-3549 when job never gets submitted (lbendig via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1545114&r1=1545113&r2=1545114&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sun Nov 24 23:44:43 2013
@@ -21,6 +21,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
@@ -700,7 +701,7 @@ public class HBaseStorage extends LoadFu
Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
- initialiseHBaseClassLoaderResources(job);
+ initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
if (delegationTokenSet == null) {
@@ -729,19 +730,38 @@ public class HBaseStorage extends LoadFu
}
}
- private void initialiseHBaseClassLoaderResources(Job job) throws IOException {
+ private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+ // Depend on HBase to do the right thing when available, as of HBASE-9165
+ try {
+ Method addHBaseDependencyJars =
+ TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
+ if (addHBaseDependencyJars != null) {
+ addHBaseDependencyJars.invoke(null, job.getConfiguration());
+ return;
+ }
+ } catch (NoSuchMethodException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
+ + " Falling back to previous logic.", e);
+ } catch (IllegalAccessException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " not permitted. Falling back to previous logic.", e);
+ } catch (InvocationTargetException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " failed. Falling back to previous logic.", e);
+ }
+ // fall back to manual class handling.
// Make sure the HBase, ZooKeeper, and Guava jars get shipped.
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
- com.google.common.collect.Lists.class,
- org.apache.zookeeper.ZooKeeper.class);
-
- // Additional jars that are specific to only some HBase versions
- // HBase 0.95+
- addClassToJobIfExists(job, "org.cloudera.htrace.Trace");
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
+ com.google.common.collect.Lists.class, // guava
+ org.apache.zookeeper.ZooKeeper.class); // zookeeper
+ // Additional jars that are specific to v0.95.0+
+ addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
+ addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
+ addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
+ addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
+ addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
}
private void addClassToJobIfExists(Job job, String className) throws IOException {
@@ -894,7 +914,7 @@ public class HBaseStorage extends LoadFu
if (LOG.isDebugEnabled()) {
LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
", cf:column=" + columnInfo);
- }
+ }
if (!columnInfo.isColumnMap()) {
put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
@@ -993,7 +1013,7 @@ public class HBaseStorage extends LoadFu
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
- initialiseHBaseClassLoaderResources(job);
+ initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
// Not setting a udf property and getting the hbase delegation token
// only once like in setLocation as setStoreLocation gets different Job
@@ -1060,7 +1080,7 @@ public class HBaseStorage extends LoadFu
( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
loadRowKey_ = false;
projOffset = 0;
- }
+ }
for (int i = projOffset; i < requiredFields.size(); i++) {
int fieldIndex = requiredFields.get(i).getIndex();
@@ -1071,7 +1091,7 @@ public class HBaseStorage extends LoadFu
LOG.debug("pushProjection After Projection: loadRowKey is " + loadRowKey_) ;
for (ColumnInfo colInfo : newColumns) {
LOG.debug("pushProjection -- col: " + colInfo);
- }
+ }
}
setColumnInfoList(newColumns);
return new RequiredFieldResponse(true);
@@ -1086,7 +1106,7 @@ public class HBaseStorage extends LoadFu
@Override
public void readFields(DataInput in) throws IOException {
tsplit.readFields(in);
-}
+ }
@Override
public void write(DataOutput out) throws IOException {