You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/09/17 00:18:55 UTC
svn commit: r1625418 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/execut...
Author: daijy
Date: Tue Sep 16 22:18:54 2014
New Revision: 1625418
URL: http://svn.apache.org/r1625418
Log:
PIG-4141: Ship UDF/LoadFunc/StoreFunc dependent jar automatically
Added:
pig/trunk/src/org/apache/pig/StoreResources.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java
pig/trunk/src/org/apache/pig/builtin/FuncUtils.java
pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/ivy.xml
pig/trunk/src/org/apache/pig/EvalFunc.java
pig/trunk/src/org/apache/pig/LoadFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/impl/util/JarManager.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/scripting/Pig.java
pig/trunk/test/e2e/pig/tests/orc.conf
pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 16 22:18:54 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4141: Ship UDF/LoadFunc/StoreFunc dependent jar automatically (daijy)
+
PIG-4146: Create a target to run mr and tez unit test in one shot (daijy)
PIG-4144: Make pigunit.PigTest work in tez mode (daijy)
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Tue Sep 16 22:18:54 2014
@@ -411,9 +411,9 @@
<dependency org="org.apache.hive" name="hive-common" rev="${hive.version}" changing="true"
conf="compile->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
- conf="test->master" />
+ conf="compile->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-common-secure" rev="${hive.version}" changing="true"
- conf="test->master" />
+ conf="compile->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
conf="hadoop23->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.20S" rev="${hive.version}" changing="true"
Modified: pig/trunk/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/EvalFunc.java Tue Sep 16 22:18:54 2014
@@ -276,7 +276,7 @@ public abstract class EvalFunc<T> {
}
/**
- * Allow a UDF to specify a list of files it would like placed in the distributed
+ * Allow a UDF to specify a list of hdfs files it would like placed in the distributed
* cache. These files will be put in the cache for every job the UDF is used in.
* The default implementation returns null.
* @return A list of files
@@ -285,6 +285,17 @@ public abstract class EvalFunc<T> {
return null;
}
+ /**
+ * Allow a UDF to specify a list of local files it would like placed in the distributed
+ * cache. These files will be put in the cache for every job the UDF is used in. Check for
+ * {@link FuncUtils} for utility function to facilitate it
+ * The default implementation returns null.
+ * @return A list of files
+ */
+ public List<String> getShipFiles() {
+ return null;
+ }
+
public PigLogger getPigLogger() {
return pigLogger;
}
Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Tue Sep 16 22:18:54 2014
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
-
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.LoadPushDown.RequiredFieldList;
@@ -305,4 +304,24 @@ public abstract class LoadFunc {
public final void warn(String msg, Enum warningEnum) {
PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
}
+
+ /**
+ * Allow a LoadFunc to specify a list of files it would like placed in the distributed
+ * cache.
+ * The default implementation returns null.
+ * @return A list of files
+ */
+ public List<String> getCacheFiles() {
+ return null;
+ }
+
+ /**
+ * Allow a LoadFunc to specify a list of files located locally and would like to ship to backend
+ * (through distributed cache). Check for {@link FuncUtils} for utility function to facilitate it
+ * The default implementation returns null.
+ * @return A list of files
+ */
+ public List<String> getShipFiles() {
+ return null;
+ }
}
Added: pig/trunk/src/org/apache/pig/StoreResources.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreResources.java (added)
+++ pig/trunk/src/org/apache/pig/StoreResources.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.util.List;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This interface allow StoreFunc to specify resources needed
+ * in distributed cache. The resources can be on dfs (getCacheFiles)
+ * or locally (getShipFiles)
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface StoreResources {
+ /**
+ * Allow a StoreFunc to specify a list of files it would like placed in the distributed
+ * cache.
+ * The default implementation returns null.
+ * @return A list of files
+ */
+ public List<String> getCacheFiles();
+
+ /**
+ * Allow a StoreFunc to specify a list of files located locally and would like to ship to backend
+ * (through distributed cache). Check for {@link FuncUtils} for utility function to facilitate it
+ * The default implementation returns null.
+ * @return A list of files
+ */
+ public List<String> getShipFiles();
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Sep 16 22:18:54 2014
@@ -18,8 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -77,9 +75,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -546,42 +544,6 @@ public class JobControlCompiler{
nwJob.setNumReduceTasks(0);
}
- for (String udf : mro.UDFs) {
- if (udf.contains("GFCross")) {
- Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
- if (func instanceof GFCross) {
- String crossKey = ((GFCross)func).getCrossKey();
- // If non GFCross has been processed yet
- if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
- pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- Integer.toString(nwJob.getNumReduceTasks()));
- }
- conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
- }
- }
- }
-
- if(lds!=null && lds.size()>0){
- for (POLoad ld : lds) {
- //Store the target operators for tuples read
- //from this input
- List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
- List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
- if(ldSucs!=null){
- for (PhysicalOperator operator2 : ldSucs) {
- ldSucKeys.add(operator2.getOperatorKey());
- }
- }
- inpTargets.add(ldSucKeys);
- inpSignatureLists.add(ld.getSignature());
- inpLimits.add(ld.getLimit());
- //Remove the POLoad from the plan
- if (!pigContext.inIllustrator)
- mro.mapPlan.remove(ld);
- }
- }
-
if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
{
if (okToRunLocal(nwJob, mro, lds)) {
@@ -610,6 +572,22 @@ public class JobControlCompiler{
conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
} else {
log.info(BIG_JOB_LOG_MSG);
+ // Search to see if we have any UDF/LoadFunc/StoreFunc that need to pack things into the
+ // distributed cache.
+ List<String> cacheFiles = new ArrayList<String>();
+ List<String> shipFiles = new ArrayList<String>();
+ UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+ mapUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+ UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+ reduceUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+ setupDistributedCache(pigContext, conf, cacheFiles.toArray(new String[]{}), false);
+
// Setup the DistributedCache for this job
List<URL> allJars = new ArrayList<URL>();
@@ -626,6 +604,13 @@ public class JobControlCompiler{
}
}
+ for (String shipFile : shipFiles) {
+ URL jar = new File(shipFile).toURI().toURL();
+ if (!allJars.contains(jar)) {
+ allJars.add(jar);
+ }
+ }
+
for (String defaultJar : JarManager.getDefaultJars()) {
URL jar = new File(defaultJar).toURI().toURL();
if (!allJars.contains(jar)) {
@@ -653,6 +638,42 @@ public class JobControlCompiler{
}
}
+ for (String udf : mro.UDFs) {
+ if (udf.contains("GFCross")) {
+ Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+ if (func instanceof GFCross) {
+ String crossKey = ((GFCross)func).getCrossKey();
+ // If non GFCross has been processed yet
+ if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
+ pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+ Integer.toString(nwJob.getNumReduceTasks()));
+ }
+ conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+ (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
+ }
+ }
+ }
+
+ if(lds!=null && lds.size()>0){
+ for (POLoad ld : lds) {
+ //Store the target operators for tuples read
+ //from this input
+ List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+ List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+ if(ldSucs!=null){
+ for (PhysicalOperator operator2 : ldSucs) {
+ ldSucKeys.add(operator2.getOperatorKey());
+ }
+ }
+ inpTargets.add(ldSucKeys);
+ inpSignatureLists.add(ld.getSignature());
+ inpLimits.add(ld.getLimit());
+ //Remove the POLoad from the plan
+ if (!pigContext.inIllustrator)
+ mro.mapPlan.remove(ld);
+ }
+ }
+
if(Utils.isLocal(pigContext, conf)) {
ConfigurationUtil.replaceConfigForLocalMode(conf);
}
@@ -779,10 +800,6 @@ public class JobControlCompiler{
// serialized
setupDistributedCacheForJoin(mro, pigContext, conf);
- // Search to see if we have any UDFs that need to pack things into the
- // distributed cache.
- setupDistributedCacheForUdfs(mro, pigContext, conf);
-
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
POPackage pack = null;
@@ -1478,13 +1495,6 @@ public class JobControlCompiler{
.visit();
}
- private void setupDistributedCacheForUdfs(MapReduceOper mro,
- PigContext pigContext,
- Configuration conf) throws IOException {
- new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
- new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
- }
-
private static void setupDistributedCache(PigContext pigContext,
Configuration conf,
Properties properties, String key,
@@ -1637,7 +1647,6 @@ public class JobControlCompiler{
Path pathInHDFS = shipToHDFS(pigContext, conf, url);
// and add to the DistributedCache
DistributedCache.addFileToClassPath(pathInHDFS, conf);
- pigContext.addSkipJar(url.getPath());
}
private static Path getCacheStagingDir(Configuration conf) throws IOException {
@@ -1854,41 +1863,6 @@ public class JobControlCompiler{
}
}
- private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-
- private PigContext pigContext = null;
- private Configuration conf = null;
-
- public UdfDistributedCacheVisitor(PhysicalPlan plan,
- PigContext pigContext,
- Configuration conf) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.pigContext = pigContext;
- this.conf = conf;
- }
-
- @Override
- public void visitUserFunc(POUserFunc func) throws VisitorException {
-
- // XXX Hadoop currently doesn't support distributed cache in local mode.
- // This line will be removed after the support is added
- if (Utils.isLocal(pigContext, conf)) return;
-
- // set up distributed cache for files indicated by the UDF
- String[] files = func.getCacheFiles();
- if (files == null) return;
-
- try {
- setupDistributedCache(pigContext, conf, files, false);
- } catch (IOException e) {
- String msg = "Internal error. Distributed cache could not " +
- "be set up for the requested files";
- throw new VisitorException(msg, e);
- }
- }
- }
-
private static class ParallelConstantVisitor extends PhyPlanVisitor {
private int rp;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Tue Sep 16 22:18:54 2014
@@ -67,7 +67,8 @@ public class POUserFunc extends Expressi
private transient String counterGroup;
private transient EvalFunc func;
- private transient String[] cacheFiles = null;
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
FuncSpec funcSpec;
FuncSpec origFSpec;
@@ -556,14 +557,22 @@ public class POUserFunc extends Expressi
instantiateFunc(funcSpec);
}
- public String[] getCacheFiles() {
+ public List<String> getCacheFiles() {
return cacheFiles;
}
- public void setCacheFiles(String[] cf) {
+ public void setCacheFiles(List<String> cf) {
cacheFiles = cf;
}
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
+
public boolean combinable() {
return (func instanceof Algebraic);
}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UdfCacheShipFilesVisitor extends PhyPlanVisitor {
+ private Set<String> cacheFiles = new HashSet<String>();
+ private Set<String> shipFiles = new HashSet<String>();
+
+ public UdfCacheShipFilesVisitor(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitLoad(POLoad ld) throws VisitorException {
+ if (ld.getCacheFiles() != null) {
+ cacheFiles.addAll(ld.getCacheFiles());
+ }
+ if (ld.getShipFiles() != null) {
+ shipFiles.addAll(ld.getShipFiles());
+ }
+ }
+
+ @Override
+ public void visitStore(POStore st) throws VisitorException {
+ if (st.getCacheFiles() != null) {
+ cacheFiles.addAll(st.getCacheFiles());
+ }
+ if (st.getShipFiles() != null) {
+ shipFiles.addAll(st.getShipFiles());
+ }
+ }
+
+ public void visitUserFunc(POUserFunc udf) throws VisitorException {
+ if (udf.getCacheFiles() != null) {
+ cacheFiles.addAll(udf.getCacheFiles());
+ }
+ if (udf.getShipFiles() != null) {
+ shipFiles.addAll(udf.getShipFiles());
+ }
+ }
+
+ public Set<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public Set<String> getShipFiles() {
+ return shipFiles;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Tue Sep 16 22:18:54 2014
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class POLoad extends PhysicalOper
private boolean isTmpLoad;
private long limit=-1;
+
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
public POLoad(OperatorKey k) {
this(k,-1, null);
@@ -252,4 +256,20 @@ public class POLoad extends PhysicalOper
public void setLimit(long limit) {
this.limit = limit;
}
+
+ public List<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public void setCacheFiles(List<String> cf) {
+ cacheFiles = cf;
+ }
+
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Tue Sep 16 22:18:54 2014
@@ -81,6 +81,9 @@ public class POStore extends PhysicalOpe
private String signature;
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
+
public POStore(OperatorKey k) {
this(k, -1, null);
}
@@ -313,4 +316,20 @@ public class POStore extends PhysicalOpe
public void setStoreFunc(StoreFuncInterface storeFunc) {
this.storer = storeFunc;
}
+
+ public List<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public void setCacheFiles(List<String> cf) {
+ cacheFiles = cf;
+ }
+
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Tue Sep 16 22:18:54 2014
@@ -54,6 +54,8 @@ public class POStoreTez extends POStore
public POStoreTez(POStore copy) {
super(copy);
this.outputKey = copy.getOperatorKey().toString();
+ this.setCacheFiles(copy.getCacheFiles());
+ this.setShipFiles(copy.getShipFiles());
}
public String getOutputKey() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Tue Sep 16 22:18:54 2014
@@ -107,6 +107,7 @@ public class TezOperPlan extends Operato
TezPOUserFuncVisitor udfVisitor = new TezPOUserFuncVisitor(this);
udfVisitor.visit();
+ addShipResources(udfVisitor.getShipFiles());
addCacheResources(udfVisitor.getCacheFiles());
return TezResourceManager.getInstance().getTezResources(extraResources.keySet());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java Tue Sep 16 22:18:54 2014
@@ -17,19 +17,16 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
public class TezPOUserFuncVisitor extends TezOpPlanVisitor {
private Set<String> cacheFiles = new HashSet<String>();
+ private Set<String> shipFiles = new HashSet<String>();
public TezPOUserFuncVisitor(TezOperPlan plan) {
super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
@@ -38,26 +35,18 @@ public class TezPOUserFuncVisitor extend
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if(!tezOp.plan.isEmpty()) {
- UdfCacheFileVisitor udfCacheFileVisitor = new UdfCacheFileVisitor(tezOp.plan);
+ UdfCacheShipFilesVisitor udfCacheFileVisitor = new UdfCacheShipFilesVisitor(tezOp.plan);
udfCacheFileVisitor.visit();
- }
- }
-
- class UdfCacheFileVisitor extends PhyPlanVisitor {
-
- public UdfCacheFileVisitor(PhysicalPlan plan) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
- }
-
- public void visitUserFunc(POUserFunc udf) throws VisitorException {
- String[] files = udf.getCacheFiles();
- if (files != null) {
- cacheFiles.addAll(Arrays.asList(files));
- }
+ cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(udfCacheFileVisitor.getShipFiles());
}
}
public Set<String> getCacheFiles() {
return cacheFiles;
}
+
+ public Set<String> getShipFiles() {
+ return shipFiles;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Tue Sep 16 22:18:54 2014
@@ -117,6 +117,8 @@ public class LoaderProcessor extends Tez
POSimpleTezLoad tezLoad = new POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile());
tezLoad.setInputKey(ld.getOperatorKey().toString());
tezLoad.copyAliasFrom(ld);
+ tezLoad.setCacheFiles(ld.getCacheFiles());
+ tezLoad.setShipFiles(ld.getShipFiles());
tezOp.plan.add(tezLoad);
for (PhysicalOperator sucs : ldSucs) {
tezOp.plan.connect(tezLoad, sucs);
Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Tue Sep 16 22:18:54 2014
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -63,9 +64,11 @@ import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.avro.AvroArrayReader;
@@ -82,7 +85,7 @@ import com.google.common.collect.Maps;
*
*/
public class AvroStorage extends LoadFunc
- implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+ implements StoreFuncInterface, LoadMetadata, LoadPushDown, StoreResources {
/**
* Creates new instance of Pig Storage function, without specifying
@@ -674,4 +677,9 @@ public class AvroStorage extends LoadFun
}
+ @Override
+ public List<String> getShipFiles() {
+ Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
+ return FuncUtils.getShipFiles(classList);
+ }
}
Added: pig/trunk/src/org/apache/pig/builtin/FuncUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/FuncUtils.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/FuncUtils.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/FuncUtils.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.util.JarManager;
+
+public class FuncUtils {
+ /**
+ * Utility function to get a list of containing jars via classes
+ * @param classesIdentifyingJars classes used to identify containing jars
+ * @return list of containing jars
+ */
+ public static List<String> getShipFiles(Class[] classesIdentifyingJars) {
+ List<String> cacheFiles = new ArrayList<String>();
+
+ for (Class clz : classesIdentifyingJars) {
+ String jar = JarManager.findContainingJar(clz);
+ cacheFiles.add(jar);
+ }
+ return cacheFiles;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Tue Sep 16 22:18:54 2014
@@ -19,28 +19,23 @@ package org.apache.pig.builtin;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
-
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.pig.Expression;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -56,6 +51,7 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
@@ -372,4 +368,11 @@ public class JsonLoader extends LoadFunc
throws IOException {
// We don't have partitions
}
+
+ @Override
+ public List<String> getShipFiles() {
+ List<String> cacheFiles = new ArrayList<String>();
+ Class[] classList = new Class[] {JsonFactory.class};
+ return FuncUtils.getShipFiles(classList);
+ }
}
Modified: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Tue Sep 16 22:18:54 2014
@@ -19,17 +19,15 @@ package org.apache.pig.builtin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.math.BigDecimal;
import java.math.BigInteger;
-import org.joda.time.DateTime;
-
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -38,25 +36,18 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreMetadata;
import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreResources;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
/**
* A JSON Pig store function. Each Pig tuple is stored on one line (as one
* value for TextOutputFormat) so that it can be read easily using
@@ -66,7 +57,7 @@ import java.util.TreeMap;
* with mapping between JSON and Pig types. The schema file share the same format
* as the one we use in PigStorage.
*/
-public class JsonStorage extends StoreFunc implements StoreMetadata {
+public class JsonStorage extends StoreFunc implements StoreMetadata, StoreResources {
protected RecordWriter writer = null;
protected ResourceSchema schema = null;
@@ -318,4 +309,14 @@ public class JsonStorage extends StoreFu
return s;
}
+ @Override
+ public List<String> getShipFiles() {
+ Class[] classList = new Class[] {JsonFactory.class};
+ return FuncUtils.getShipFiles(classList);
+ }
+
+ @Override
+ public List<String> getCacheFiles() {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Tue Sep 16 22:18:54 2014
@@ -49,11 +49,13 @@ import org.apache.hadoop.hive.ql.io.orc.
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -80,6 +82,7 @@ import org.apache.pig.Expression.BinaryE
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
@@ -109,7 +112,7 @@ import com.google.common.annotations.Vis
* <li><code>-v, --version</code> Sets the version of the file that will be written
* </ul>
**/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources {
//TODO Make OrcInputFormat.SARG_PUSHDOWN visible
private static final String SARG_PUSHDOWN = "sarg.pushdown";
@@ -382,6 +385,25 @@ public class OrcStorage extends LoadFunc
}
}
+ @Override
+ public List<String> getShipFiles() {
+ List<String> cacheFiles = new ArrayList<String>();
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ Class hadoopVersionShimsClass;
+ try {
+ hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+ hadoopVersion + "Shims");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+ }
+ Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass};
+ return FuncUtils.getShipFiles(classList);
+ }
+
private static Path getFirstFile(String location, FileSystem fs) throws IOException {
String[] locations = getPathStrings(location);
Path[] paths = new Path[locations.length];
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Sep 16 22:18:54 2014
@@ -105,7 +105,9 @@ public class PigContext implements Seria
* Resources for the job (jars, scripting udf files, cached macro abstract syntax trees)
*/
- // extra jar files that are needed to run a job
+ // Jar files that are global to the whole Pig script, includes
+ // 1. registered jars
+ // 2. Jars defined in -Dpig.additional.jars
transient public List<URL> extraJars = new LinkedList<URL>();
// original paths each extra jar came from
@@ -115,10 +117,6 @@ public class PigContext implements Seria
// jars needed for scripting udfs - jython.jar etc
transient public List<String> scriptJars = new ArrayList<String>(2);
- // jars that should not be merged in.
- // (some functions may come from pig.jar and we don't want the whole jar file.)
- transient public Vector<String> skipJars = new Vector<String>(2);
-
// jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
transient public Vector<String> predeployedJars = new Vector<String>(2);
@@ -260,13 +258,6 @@ public class PigContext implements Seria
this.properties = properties;
this.properties.setProperty("exectype", this.execType.name());
- String pigJar = JarManager.findContainingJar(Main.class);
- String hadoopJar = JarManager.findContainingJar(FileSystem.class);
- if (pigJar != null) {
- addSkipJar(pigJar);
- if (!pigJar.equals(hadoopJar))
- addSkipJar(hadoopJar);
- }
this.executionEngine = execType.getExecutionEngine(this);
@@ -345,12 +336,6 @@ public class PigContext implements Seria
}
}
- public void addSkipJar(String path) {
- if (path != null && !skipJars.contains(path)) {
- skipJars.add(path);
- }
- }
-
public void addJar(String path) throws MalformedURLException {
if (path != null) {
URL resource = (new File(path)).toURI().toURL();
Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Tue Sep 16 22:18:54 2014
@@ -68,8 +68,6 @@ public class JarManager {
AUTOMATON(Automaton.class),
ANTLR(CommonTokenStream.class),
GUAVA(Multimaps.class),
- JACKSON_CORE(JsonPropertyOrder.class),
- JACKSON_MAPPER(JacksonStdImpl.class),
JODATIME(DateTime.class);
private final Class pkgClass;
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Tue Sep 16 22:18:54 2014
@@ -94,8 +94,21 @@ public class Utils {
public static boolean isVendorIBM() {
return System.getProperty("java.vendor").contains("IBM");
}
-
-
+
+ public static boolean isHadoop23() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b0\\.23\\..+\\b"))
+ return true;
+ return false;
+ }
+
+ public static boolean isHadoop2() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b2\\.\\d+\\..+"))
+ return true;
+ return false;
+ }
+
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Sep 16 22:18:54 2014
@@ -512,7 +512,7 @@ public class ExpToPhyTranslationVisitor
}
List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
if (cacheFiles != null) {
- ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
+ ((POUserFunc)p).setCacheFiles(cacheFiles);
}
} else {
p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Sep 16 22:18:54 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -142,6 +143,8 @@ public class LogToPhyTranslationVisitor
load.setSignature(loLoad.getSignature());
load.setLimit(loLoad.getLimit());
load.setIsTmpLoad(loLoad.isTmpLoad());
+ load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles());
+ load.setShipFiles(loLoad.getLoadFunc().getShipFiles());
currentPlan.add(load);
logToPhyMap.put(loLoad, load);
@@ -953,8 +956,11 @@ public class LogToPhyTranslationVisitor
store.setSortInfo(loStore.getSortInfo());
store.setIsTmpStore(loStore.isTmpStore());
store.setStoreFunc(loStore.getStoreFunc());
-
store.setSchema(Util.translateSchema( loStore.getSchema() ));
+ if (loStore.getStoreFunc() instanceof StoreResources) {
+ store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles());
+ store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles());
+ }
currentPlan.add(store);
Modified: pig/trunk/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/Pig.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/Pig.java Tue Sep 16 22:18:54 2014
@@ -120,7 +120,7 @@ public class Pig {
*/
public static void registerUDF(String udffile, String namespace)
throws IOException {
- LOG.info("Register script UFD file: "+ udffile);
+ LOG.info("Register script UDF file: "+ udffile);
ScriptPigContext ctx = getScriptContext();
ScriptEngine engine = ctx.getScriptEngine();
// script file contains only functions, no need to separate
Modified: pig/trunk/test/e2e/pig/tests/orc.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/orc.conf?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/orc.conf (original)
+++ pig/trunk/test/e2e/pig/tests/orc.conf Tue Sep 16 22:18:54 2014
@@ -14,13 +14,6 @@ $cfg = {
'num' => 1,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
store a into ':OUTPATH:.intermediate' using OrcStorage();
exec
@@ -36,13 +29,6 @@ store b into ':OUTPATH:';\,
'num' => 2,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)});
store a into ':OUTPATH:.intermediate' using OrcStorage();
exec
@@ -56,13 +42,7 @@ store a into ':OUTPATH:';\,
{
'num' => 3,
'notmq' => 1,
- 'pig' => q\register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
+ 'pig' => q\
a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:float);
store a into ':OUTPATH:.simple.intermediate' using OrcStorage();
exec
@@ -93,13 +73,6 @@ store h into ':OUTPATH:';\,
'num' => 4,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
store a into ':OUTPATH:.orc_params.intermediate' using OrcStorage('-c ZLIB -s 67108864 -r 100000 -b 1048576 -p true -v 0.12');
exec
@@ -119,13 +92,6 @@ store a into ':OUTPATH:';\,
'num' => 1,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = order a by name parallel 4;
store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -141,13 +107,6 @@ store b into ':OUTPATH:';\,
'num' => 2,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:float);
b = order a by age desc parallel 4;
store b into ':OUTPATH:.intermediate' using OrcStorage('-s 10000000');
@@ -163,13 +122,6 @@ store b into ':OUTPATH:';\,
'num' => 3,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = order a by gpa parallel 4;
store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -185,13 +137,6 @@ store b into ':OUTPATH:';\,
'num' => 4,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:bigdecimal);
b = order a by gpa parallel 4;
store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -209,13 +154,6 @@ store b into ':OUTPATH:';\,
'num' => 5,
'notmq' => 1,
'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = foreach a generate name, age, gpa, (age>35 ? ToDate('20100101', 'yyyyMMdd', 'UTC') : ToDate('20100105', 'yyyyMMdd', 'UTC')) as d;
c = order b by d parallel 4;
Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Tue Sep 16 22:18:54 2014
@@ -351,7 +351,7 @@ public class TestLoadStoreFuncLifeCycle
// result, the number of StoreFunc instances is greater by 1 in
// Hadoop-2.0.x.
assertTrue("storer instanciation count increasing: " + Storer.count,
- Storer.count <= (Util.isHadoop2_0() ? 5 : 4));
+ Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
}
Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Tue Sep 16 22:18:54 2014
@@ -72,7 +72,7 @@ public class TestQueryParserUtils {
QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- if(Util.isHadoop23() || Util.isHadoop2_0()) {
+ if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
// webhdfs
props.remove(MRConfiguration.JOB_HDFS_SERVERS);
QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Tue Sep 16 22:18:54 2014
@@ -126,7 +126,7 @@ public class TestJobControlCompiler {
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 8, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
Added: pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPOUserFuncVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoaderStorerShipCacheFiles {
+ private static PigServer pigServer;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.getPigContext().inExplain = true;
+ }
+
+ @Test
+ public void testShipOrcLoader() throws Exception {
+ String query = "a = load 'test/org/apache/pig/builtin/orc/orc-file-11-format.orc' using OrcStorage();" +
+ "store a into 'ooo';";
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure"};
+
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+ assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 6);
+
+ TezLauncher launcher = new TezLauncher();
+ TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+ assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 6);
+ }
+
+ @Test
+ public void testShipOrcStorer() throws Exception {
+ String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+ "store a into 'ooo' using OrcStorage;";
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure"};
+
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+ assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 6);
+
+ TezLauncher launcher = new TezLauncher();
+ TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+ assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 6);
+ }
+
+ @Test
+ public void testShipAvroLoader() throws Exception {
+ String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+ "store a into 'ooo' using AvroStorage();";
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+ String[] expectedJars = new String[] {"avro-1", "avro-mapred-"};
+
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+ assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 2);
+
+ TezLauncher launcher = new TezLauncher();
+ TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+ assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 2);
+ }
+
+ @Test
+ public void testShipJsonLoader() throws Exception {
+ String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+ "b = order a by name;" +
+ "store b into 'ooo' using JsonStorage();";
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+ String[] expectedJars = new String[] {"jackson-core-"};
+
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+ assertMRPlanContains(mrPlan.getLeaves().get(0), expectedJars, 1);
+
+ TezLauncher launcher = new TezLauncher();
+ TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+ assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 1);
+ }
+
+ private void assertMRPlanContains(MapReduceOper mro, String[] expectedFiles, int size) throws VisitorException {
+ List<String> cacheFiles = new ArrayList<String>();
+ List<String> shipFiles = new ArrayList<String>();
+ UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+ mapUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+ UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+ reduceUdfCacheFileVisitor.visit();
+ cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+ Assert.assertEquals(shipFiles.size(), size);
+ assertContains(shipFiles, expectedFiles);
+ }
+
+ private void assertTezPlanContains(TezOperPlan plan, String[] expectedFiles, int size) throws VisitorException {
+ TezPOUserFuncVisitor udfVisitor = new TezPOUserFuncVisitor(plan);
+ udfVisitor.visit();
+
+ List<String> shipFiles = new ArrayList<String>();
+ shipFiles.addAll(udfVisitor.getShipFiles());
+
+ Assert.assertEquals(shipFiles.size(), size);
+ assertContains(shipFiles, expectedFiles);
+ }
+
+ private void assertContains(List<String> collectedFiles, String[] expectedFiles) {
+ for (String expectedFile : expectedFiles) {
+ boolean found = false;
+ for (String collectedFile : collectedFiles) {
+ if (collectedFile.contains(expectedFile)) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(found);
+ }
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Tue Sep 16 22:18:54 2014
@@ -633,7 +633,7 @@ public class TestPigRunner {
@Test
public void classLoaderTest() throws Exception {
// Skip in hadoop 23 test, see PIG-2449
- if (Util.isHadoop23() || Util.isHadoop2_0())
+ if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
return;
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
Modified: pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java Tue Sep 16 22:18:54 2014
@@ -34,7 +34,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
import org.junit.Assert;
import org.junit.Test;
@@ -58,22 +57,22 @@ public class TestPredeployedJar {
pigServer.getPigContext().getProperties().put(PigConfiguration.OPT_FETCH, "false");
String[] inputData = new String[] { "hello", "world" };
Util.createInputFile(cluster, "a.txt", inputData);
- String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
+ String guavaJar = JarManager.findContainingJar(com.google.common.collect.Multimaps.class);
pigServer.registerQuery("a = load 'a.txt' as (line:chararray);");
Iterator<Tuple> it = pigServer.openIterator("a");
String content = FileUtils.readFileToString(logFile);
- Assert.assertTrue(content.contains(jacksonJar));
+ Assert.assertTrue(content.contains(guavaJar));
logFile = File.createTempFile("log", "");
- // Now let's mark the jackson jar as predeployed.
- pigServer.getPigContext().markJarAsPredeployed(jacksonJar);
+ // Now let's mark the guava jar as predeployed.
+ pigServer.getPigContext().markJarAsPredeployed(guavaJar);
it = pigServer.openIterator("a");
content = FileUtils.readFileToString(logFile);
- Assert.assertFalse(content.contains(jacksonJar));
+ Assert.assertFalse(content.contains(guavaJar));
}
@Test
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue Sep 16 22:18:54 2014
@@ -611,7 +611,7 @@ public class Util {
}
static private String getMkDirCommandForHadoop2_0(String fileName) {
- if (Util.isHadoop23() || Util.isHadoop2_0()) {
+ if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
Path parentDir = new Path(fileName).getParent();
String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
return mkdirCommand;
@@ -1231,13 +1231,6 @@ public class Util {
return plan.replaceAll("','','[^']*','scope','true'\\)\\)", "','','','scope','true'))");
}
- public static boolean isHadoop23() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b0\\.23\\..+\\b"))
- return true;
- return false;
- }
-
public static boolean isHadoop203plus() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
if (version.matches("\\b0\\.20\\.2\\b"))
@@ -1274,13 +1267,6 @@ public class Util {
assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
}
- public static boolean isHadoop2_0() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b2\\.\\d\\..+"))
- return true;
- return false;
- }
-
/**
* Returns a PathFilter that filters out filenames that start with _.
* @return PathFilter