You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/24 17:09:12 UTC
svn commit: r1634073 - in /pig/branches/branch-0.14: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
src/org/apache/pig/builtin/FuncUtils.java
Author: rohini
Date: Fri Oct 24 15:09:11 2014
New Revision: 1634073
URL: http://svn.apache.org/r1634073
Log:
PIG-4246: HBaseStorage should implement getShipFiles (rohini)
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 24 15:09:11 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4246: HBaseStorage should implement getShipFiles (rohini)
+
PIG-3456: Reduce threadlocal conf access in backend for each record (rohini)
PIG-3861: duplicate jars get added to distributed cache (chitnis via rohini)
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 24 15:09:11 2014
@@ -639,7 +639,6 @@ public class JobControlCompiler{
}
}
if (!predeployed) {
- log.info("Adding jar to DistributedCache: " + jar);
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
}
}
@@ -1653,6 +1652,8 @@ public class JobControlCompiler{
Path distCachePath = getExistingDistCacheFilePath(conf, url);
if (distCachePath != null) {
+ log.info("Jar file " + url + " already in DistributedCache as "
+ + distCachePath + ". Not copying to hdfs and adding again");
// Path already in dist cache
if (!HadoopShims.isHadoopYARN()) {
// Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
@@ -1665,8 +1666,8 @@ public class JobControlCompiler{
else {
// REGISTER always copies locally the jar file. see PigServer.registerJar()
Path pathInHDFS = shipToHDFS(pigContext, conf, url);
- // and add to the DistributedCache
DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+ log.info("Added jar " + url + " to DistributedCache through " + pathInHDFS);
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Oct 24 15:09:11 2014
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -82,8 +84,10 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
+import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -135,7 +139,7 @@ import com.google.common.collect.Lists;
* <code>buddies</code> column family in the <code>SampleTableCopy</code> table.
*
*/
-public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc {
+public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources {
private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
@@ -317,7 +321,7 @@ public class HBaseStorage extends LoadFu
if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
noWAL_ = true;
}
- }
+ }
if (configuredOptions_.hasOption("minTimestamp")){
minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
@@ -719,7 +723,6 @@ public class HBaseStorage extends LoadFu
Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
- initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
if (delegationTokenSet == null) {
@@ -748,14 +751,23 @@ public class HBaseStorage extends LoadFu
}
}
- private void initializeHBaseClassLoaderResources(Job job) throws IOException {
+ @Override
+ public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
Method addHBaseDependencyJars =
TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
if (addHBaseDependencyJars != null) {
- addHBaseDependencyJars.invoke(null, job.getConfiguration());
- return;
+ Configuration conf = new Configuration();
+ addHBaseDependencyJars.invoke(null, conf);
+ if (conf.get("tmpjars") != null) {
+ String[] tmpjars = conf.getStrings("tmpjars");
+ List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+ for (String tmpjar : tmpjars) {
+ shipFiles.add(new URL(tmpjar).getPath());
+ }
+ return shipFiles;
+ }
}
} catch (NoSuchMethodException e) {
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
@@ -766,32 +778,32 @@ public class HBaseStorage extends LoadFu
} catch (InvocationTargetException e) {
LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ " failed. Falling back to previous logic.", e);
+ } catch (MalformedURLException e) {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ + " had malformed url. Falling back to previous logic.", e);
}
- // fall back to manual class handling.
- // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client
- org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server
- com.google.common.collect.Lists.class, // guava
- org.apache.zookeeper.ZooKeeper.class); // zookeeper
+
+ List<Class> classList = new ArrayList<Class>();
+ classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
+ classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+ classList.add(com.google.common.collect.Lists.class); // guava
+ classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
- addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common
- addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
- addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty
- }
-
- private void addClassToJobIfExists(Job job, String className) throws IOException {
- Class klass = null;
- try {
- klass = Class.forName(className);
- } catch (ClassNotFoundException e) {
- LOG.debug("Skipping adding jar for class: " + className);
- return;
- }
+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace
+ addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
+ addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
+ addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+ addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
+ return FuncUtils.getShipFiles(classList);
+ }
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass);
+ private void addClassToList(String className, List<Class> classList) {
+ try {
+ Class klass = Class.forName(className);
+ classList.add(klass);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Skipping adding jar for class: " + className);
+ }
}
private JobConf initializeLocalJobConfig(Job job) {
@@ -1035,7 +1047,6 @@ public class HBaseStorage extends LoadFu
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
- initializeHBaseClassLoaderResources(job);
m_conf = initializeLocalJobConfig(job);
// Not setting a udf property and getting the hbase delegation token
// only once like in setLocation as setStoreLocation gets different Job
Modified: pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java?rev=1634073&r1=1634072&r2=1634073&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/builtin/FuncUtils.java Fri Oct 24 15:09:11 2014
@@ -18,6 +18,7 @@
package org.apache.pig.builtin;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.pig.impl.util.JarManager;
@@ -29,6 +30,10 @@ public class FuncUtils {
* @return list of containing jars
*/
public static List<String> getShipFiles(Class[] classesIdentifyingJars) {
+ return getShipFiles(Arrays.asList(classesIdentifyingJars));
+ }
+
+ public static List<String> getShipFiles(List<Class> classesIdentifyingJars) {
List<String> cacheFiles = new ArrayList<String>();
for (Class clz : classesIdentifyingJars) {