You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/18 17:58:24 UTC
svn commit: r1569428 - in /pig/branches/tez: ./ ivy/
shims/test/hadoop23/org/apache/pig/test/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/impl/builtin/
Author: rohini
Date: Tue Feb 18 16:58:24 2014
New Revision: 1569428
URL: http://svn.apache.org/r1569428
Log:
PIG-3767: Work with TEZ-668 which allows starting and closing of inputs and outputs (rohini)
Modified:
pig/branches/tez/build.xml
pig/branches/tez/ivy.xml
pig/branches/tez/ivy/libraries.properties
pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
Modified: pig/branches/tez/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/build.xml?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/build.xml (original)
+++ pig/branches/tez/build.xml Tue Feb 18 16:58:24 2014
@@ -182,6 +182,13 @@
<equals arg1="${hadoopversion}" arg2="23"/>
</condition>
+ <!-- Tez needs guava 15.0 whereas Hadoop 1.x depends on guava 11 -->
+ <script language="javascript">
+ if (project.getProperty('hadoopversion').equals('23')) {
+ project.setProperty('guava.version', project.getProperty('guava-hadoop2.version'));
+ }
+ </script>
+
<!--
HBase master version
Denotes how the HBase dependencies are layout. Value "94" denotes older
Modified: pig/branches/tez/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy.xml?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/ivy.xml (original)
+++ pig/branches/tez/ivy.xml Tue Feb 18 16:58:24 2014
@@ -388,15 +388,12 @@
<dependency org="org.apache.tez" name="tez-runtime-library" rev="${tez.version}"
conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
- conf="hadoop23->master">
- <artifact name="tez-mapreduce" ext="jar" />
- <artifact name="tez-mapreduce" type="tests" ext="jar" m:classifier="tests" />
- </dependency>
- <dependency org="org.apache.tez" name="tez-tests" rev="${tez.version}"
- conf="hadoop23->master">
- <artifact name="tez-tests" ext="jar" />
- <artifact name="tez-tests" type="tests" ext="jar" m:classifier="tests" />
- </dependency>
+ conf="hadoop23->master"/>
+ <dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
+ conf="hadoop23->master"/>
+ <dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"
+ conf="hadoop23->master"/>
+
</dependencies>
</ivy-module>
Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Tue Feb 18 16:58:24 2014
@@ -27,6 +27,7 @@ commons-logging.version=1.1.1
commons-lang.version=2.4
commons-configuration.version=1.6
commons-collections.version=3.2.1
+commons-collections4.version=4.0
commons-httpclient.version=3.1
xmlenc.version=0.52
jersey.version=1.8
@@ -35,6 +36,7 @@ ivy.version=2.2.0
jasper.version=6.1.14
groovy.version=1.8.6
guava.version=11.0
+guava-hadoop2.version=15.0
jersey-core.version=1.8
hadoop-core.version=1.0.0
hadoop-test.version=1.0.0
@@ -54,6 +56,7 @@ jaxb-api.version=2.2.2
jaxb-impl.version=2.2.3-1
jdeb.version=0.8
jdiff.version=1.0.9
+jettison.version=1.3.4
jetty.version=6.1.26
jetty-util.version=6.1.26
jline.version=0.9.94
Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Tue Feb 18 16:58:24 2014
@@ -60,7 +60,10 @@ public class TezMiniCluster extends Mini
CONF_DIR.mkdirs();
// Build mini DFS cluster
- m_dfs = new MiniDFSCluster.Builder(new Configuration())
+ Configuration hdfsConf = new Configuration(false);
+ hdfsConf.addResource("core-default.xml");
+ hdfsConf.addResource("hdfs-default.xml");
+ m_dfs = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(2)
.format(true)
.racks(null)
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Tue Feb 18 16:58:24 2014
@@ -123,7 +123,7 @@ public class PlanHelper {
* @return a LinkedList of operators contained within the plan which implement the supplied class; empty if no such ops exist.
* @throws VisitorException
*/
- public static <C extends PhysicalOperator> LinkedList<C> getPhysicalOperators(PhysicalPlan plan,
+ public static <C> LinkedList<C> getPhysicalOperators(PhysicalPlan plan,
Class<C> opClass) throws VisitorException {
OpFinder<C> finder = new OpFinder<C>(plan, opClass);
finder.visit();
@@ -149,7 +149,7 @@ public class PlanHelper {
return plan;
}
- private static class OpFinder<C extends PhysicalOperator> extends PhyPlanVisitor {
+ private static class OpFinder<C> extends PhyPlanVisitor {
final Class<C> opClass;
private LinkedList<C> foundOps = Lists.newLinkedList();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Feb 18 16:58:24 2014
@@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,16 +57,30 @@ public class POFRJoinTez extends POFRJoi
@SuppressWarnings("rawtypes")
private List<BroadcastKVReader> replReaders = Lists.newArrayList();
private List<String> inputKeys;
+ private transient boolean isInputCached;
public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException {
super(copy);
this.inputKeys = inputKeys;
}
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ String cacheKey = "replicatemap-" + getOperatorKey().toString();
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ isInputCached = true;
+ inputsToSkip.addAll(inputKeys);
+ }
+ }
+
@SuppressWarnings("rawtypes")
@Override
public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
throws ExecException {
+ if (isInputCached) {
+ return;
+ }
try {
for (String key : inputKeys) {
LogicalInput input = inputs.get(key);
@@ -89,8 +104,8 @@ public class POFRJoinTez extends POFRJoi
protected void setUpHashMap() throws ExecException {
String cacheKey = "replicatemap-" + getOperatorKey().toString();
- Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
- if (cacheValue != null) {
+ if (isInputCached) {
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
replicates = (TupleToMapKey[]) cacheValue;
log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
return;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -61,6 +62,10 @@ public class POIdentityInOutTez extends
}
@Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf) throws ExecException {
LogicalInput input = inputs.get(inputKey);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Feb 18 16:58:24 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
@@ -59,6 +60,10 @@ public class POShuffleTezLoad extends PO
}
@Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
throws ExecException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
@@ -47,6 +48,10 @@ public class POSimpleTezLoad extends POL
}
@Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf)
throws ExecException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,6 +54,10 @@ public class POValueInputTez extends Phy
}
@Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf)
throws ExecException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Feb 18 16:58:24 2014
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -53,6 +55,8 @@ import org.apache.tez.runtime.api.TezPro
import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
public class PigProcessor implements LogicalIOProcessor {
+
+ private static final Log LOG = LogFactory.getLog(PigProcessor.class);
// Names of the properties that store serialized physical plans
public static final String PLAN = "pig.exec.tez.plan";
public static final String COMBINE_PLAN = "pig.exec.tez.combine.plan";
@@ -108,90 +112,101 @@ public class PigProcessor implements Log
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
- initializeInputs(inputs);
- initializeOutputs(outputs);
+ try {
+ initializeInputs(inputs);
- sampleVertex = conf.get("pig.sampleVertex");
- if (sampleVertex != null) {
- collectSample(sampleVertex, inputs.get(sampleVertex));
- }
+ initializeOutputs(outputs);
- List<PhysicalOperator> leaves = null;
- if (!execPlan.isEmpty()) {
- leaves = execPlan.getLeaves();
- // TODO: Pull from all leaves when there are multiple leaves/outputs
- leaf = leaves.get(0);
- }
+ List<PhysicalOperator> leaves = null;
- runPipeline(leaf);
+ if (!execPlan.isEmpty()) {
+ leaves = execPlan.getLeaves();
+ // TODO: Pull from all leaves when there are multiple leaves/outputs
+ leaf = leaves.get(0);
+ }
- // For certain operators (such as STREAM), we could still have some work
- // to do even after seeing the last input. These operators set a flag that
- // says all input has been sent and to run the pipeline one more time.
- if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
- execPlan.endOfAllInput = true;
runPipeline(leaf);
- }
- for (MROutput fileOutput : fileOutputs){
- fileOutput.commit();
+ // For certain operators (such as STREAM), we could still have some work
+ // to do even after seeing the last input. These operators set a flag that
+ // says all input has been sent and to run the pipeline one more time.
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+ execPlan.endOfAllInput = true;
+ runPipeline(leaf);
+ }
+
+ for (MROutput fileOutput : fileOutputs){
+ fileOutput.commit();
+ }
+ } catch (Exception e) {
+ LOG.error("Encountered exception while processing: ", e);
+ throw e;
}
}
private void initializeInputs(Map<String, LogicalInput> inputs)
- throws IOException {
- // getPhysicalOperators only accept C extends PhysicalOperator, so we can't change it to look for TezLoad
- // TODO: Change that.
- LinkedList<POSimpleTezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, POSimpleTezLoad.class);
- for (POSimpleTezLoad tezLd : tezLds){
- tezLd.attachInputs(inputs, conf);
+ throws Exception {
+
+ Set<String> inputsToSkip = new HashSet<String>();
+
+ sampleVertex = conf.get("pig.sampleVertex");
+ if (sampleVertex != null) {
+ collectSample(sampleVertex, inputs.get(sampleVertex));
+ inputsToSkip.add(sampleVertex);
}
- LinkedList<POShuffleTezLoad> shuffles = PlanHelper.getPhysicalOperators(execPlan, POShuffleTezLoad.class);
- for (POShuffleTezLoad shuffle : shuffles){
- shuffle.attachInputs(inputs, conf);
- }
- LinkedList<POIdentityInOutTez> identityInOuts = PlanHelper.getPhysicalOperators(execPlan, POIdentityInOutTez.class);
- for (POIdentityInOutTez identityInOut : identityInOuts){
- identityInOut.attachInputs(inputs, conf);
- }
- LinkedList<POValueInputTez> valueInputs = PlanHelper.getPhysicalOperators(execPlan, POValueInputTez.class);
- for (POValueInputTez input : valueInputs){
- input.attachInputs(inputs, conf);
+
+ LinkedList<TezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, TezLoad.class);
+ for (TezLoad tezLd : tezLds){
+ tezLd.addInputsToSkip(inputsToSkip);
}
- LinkedList<POUserFunc> scalarInputs = PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class);
- for (POUserFunc userFunc : scalarInputs ) {
+
+ LinkedList<ReadScalarsTez> scalarInputs = new LinkedList<ReadScalarsTez>();
+ for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class) ) {
if (userFunc.getFunc() instanceof ReadScalarsTez) {
- ((ReadScalarsTez)userFunc.getFunc()).attachInputs(inputs, conf);
+ scalarInputs.add((ReadScalarsTez)userFunc.getFunc());
+ }
+ }
+
+ for (ReadScalarsTez scalarInput: scalarInputs) {
+ scalarInput.addInputsToSkip(inputsToSkip);
+ }
+
+ for (Entry<String, LogicalInput> entry : inputs.entrySet()) {
+ if (inputsToSkip.contains(entry.getKey())) {
+ LOG.info("Skipping fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
+ } else {
+ LOG.info("Starting fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
+ entry.getValue().start();
}
}
- LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
- for (POFRJoinTez broadcast : broadcasts){
- broadcast.attachInputs(inputs, conf);
+
+ for (TezLoad tezLd : tezLds){
+ tezLd.attachInputs(inputs, conf);
+ }
+
+ for (ReadScalarsTez scalarInput: scalarInputs) {
+ scalarInput.attachInputs(inputs, conf);
}
+
}
private void initializeOutputs(Map<String, LogicalOutput> outputs) throws Exception {
- LinkedList<POStoreTez> stores = PlanHelper.getPhysicalOperators(execPlan, POStoreTez.class);
- for (POStoreTez store : stores){
- store.attachOutputs(outputs, conf);
- }
- LinkedList<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(execPlan, POLocalRearrangeTez.class);
- for (POLocalRearrangeTez lr : rearranges){
- lr.attachOutputs(outputs, conf);
- }
- LinkedList<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(execPlan, POValueOutputTez.class);
- for (POValueOutputTez output : valueOutputs){
- output.attachOutputs(outputs, conf);
- }
- for (Entry<String, LogicalOutput> entry : outputs.entrySet()){
- LogicalOutput logicalOutput = entry.getValue();
- if (logicalOutput instanceof MROutput){
- MROutput mrOut = (MROutput) logicalOutput;
+
+ for (Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+ LogicalOutput output = entry.getValue();
+ LOG.info("Starting output " + output + " to vertex " + entry.getKey());
+ output.start();
+ if (output instanceof MROutput){
+ MROutput mrOut = (MROutput) output;
fileOutputs.add(mrOut);
}
}
+ LinkedList<TezOutput> tezOuts = PlanHelper.getPhysicalOperators(execPlan, TezOutput.class);
+ for (TezOutput tezOut : tezOuts){
+ tezOut.attachOutputs(outputs, conf);
+ }
}
protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
@@ -231,6 +246,8 @@ public class PigProcessor implements Log
if (cached == Boolean.TRUE) {
return;
}
+ LOG.info("Starting fetch of input " + logicalInput + " from vertex " + sampleVertex);
+ logicalInput.start();
BroadcastKVReader reader = (BroadcastKVReader) logicalInput.getReader();
reader.next();
Object val = reader.getCurrentValue();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java Tue Feb 18 16:58:24 2014
@@ -19,6 +19,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
@@ -30,6 +31,15 @@ import org.apache.tez.runtime.api.Logica
*/
public interface TezLoad {
+
+ /**
+ * Add to the list of inputs to skip download if already available in vertex cache
+ *
+ * @param inputsToSkip
+ */
+ public void addInputsToSkip(Set<String> inputsToSkip);
+
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf) throws ExecException;
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Tue Feb 18 16:58:24 2014
@@ -50,6 +50,7 @@ public class TezResourceManager {
}
public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+ resources.clear();
TezResourceManager.stagingDir = stagingDir;
TezResourceManager.pigContext = pigContext;
TezResourceManager.conf = conf;
Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Tue Feb 18 16:58:24 2014
@@ -19,43 +19,75 @@ package org.apache.pig.impl.builtin;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
import org.apache.pig.data.Tuple;
import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
- String inputKey;
- Tuple t;
+ private String inputKey;
+ private transient Tuple t;
+ private transient LogicalInput input;
+
public ReadScalarsTez(String inputKey) {
this.inputKey = inputKey;
}
- public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
- throws ExecException {
- LogicalInput input = inputs.get(inputKey);
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ String cacheKey = "scalar-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ inputsToSkip.add(inputKey);
+ }
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ String cacheKey = "scalar-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ t = (Tuple) cacheValue;
+ return;
+ }
+ input = inputs.get(inputKey);
if (input == null) {
throw new ExecException("Input from vertex " + inputKey + " is missing");
}
try {
- BroadcastKVReader reader = (BroadcastKVReader)input.getReader();
- reader.next();
- t = (Tuple)reader.getCurrentValue();
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ if (reader.next()) {
+ t = (Tuple) reader.getCurrentValue();
+ if (reader.next()) {
+ String msg = "Scalar has more than one row in the output. "
+ + "1st : " + t + ", 2nd :"
+ + reader.getCurrentValue();
+ throw new ExecException(msg);
+ }
+ } else {
+ LOG.info("Scalar input from vertex " + inputKey + " is null");
+ }
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
} catch (Exception e) {
throw new ExecException(e);
}
+ ObjectCache.getInstance().cache(cacheKey, t);
+ log.info("Cached scalar in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
}
@Override
public Object exec(Tuple input) throws IOException {
- int pos = (Integer)input.get(0);
+ int pos = (Integer) input.get(0);
Object obj = t.get(pos);
return obj;
}