You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/18 17:58:24 UTC

svn commit: r1569428 - in /pig/branches/tez: ./ ivy/ shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/impl/builtin/

Author: rohini
Date: Tue Feb 18 16:58:24 2014
New Revision: 1569428

URL: http://svn.apache.org/r1569428
Log:
PIG-3767: Work with TEZ-668 which allows starting and closing of inputs and outputs (rohini)

Modified:
    pig/branches/tez/build.xml
    pig/branches/tez/ivy.xml
    pig/branches/tez/ivy/libraries.properties
    pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java

Modified: pig/branches/tez/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/build.xml?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/build.xml (original)
+++ pig/branches/tez/build.xml Tue Feb 18 16:58:24 2014
@@ -182,6 +182,13 @@
         <equals arg1="${hadoopversion}" arg2="23"/>
     </condition>
 
+    <!-- Tez needs guava 15.0 whereas Hadoop 1.x depends on guava 11 -->
+    <script language="javascript">
+        if (project.getProperty('hadoopversion').equals('23')) {
+            project.setProperty('guava.version', project.getProperty('guava-hadoop2.version'));
+        }
+    </script>
+
     <!--
       HBase master version
       Denotes how the HBase dependencies are layout. Value "94" denotes older

Modified: pig/branches/tez/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy.xml?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/ivy.xml (original)
+++ pig/branches/tez/ivy.xml Tue Feb 18 16:58:24 2014
@@ -388,15 +388,12 @@
     <dependency org="org.apache.tez" name="tez-runtime-library" rev="${tez.version}"
        conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
-       conf="hadoop23->master">
-      <artifact name="tez-mapreduce" ext="jar" />
-      <artifact name="tez-mapreduce" type="tests" ext="jar" m:classifier="tests" />
-    </dependency>
-    <dependency org="org.apache.tez" name="tez-tests" rev="${tez.version}"
-       conf="hadoop23->master">
-      <artifact name="tez-tests" ext="jar" />
-      <artifact name="tez-tests" type="tests" ext="jar" m:classifier="tests" />
-    </dependency>
+       conf="hadoop23->master"/>
+    <dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
+      conf="hadoop23->master"/>
+    <dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"
+      conf="hadoop23->master"/>
+
   </dependencies>
 </ivy-module>
 

Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Tue Feb 18 16:58:24 2014
@@ -27,6 +27,7 @@ commons-logging.version=1.1.1
 commons-lang.version=2.4
 commons-configuration.version=1.6
 commons-collections.version=3.2.1
+commons-collections4.version=4.0
 commons-httpclient.version=3.1
 xmlenc.version=0.52
 jersey.version=1.8
@@ -35,6 +36,7 @@ ivy.version=2.2.0
 jasper.version=6.1.14
 groovy.version=1.8.6
 guava.version=11.0
+guava-hadoop2.version=15.0
 jersey-core.version=1.8
 hadoop-core.version=1.0.0
 hadoop-test.version=1.0.0
@@ -54,6 +56,7 @@ jaxb-api.version=2.2.2
 jaxb-impl.version=2.2.3-1
 jdeb.version=0.8
 jdiff.version=1.0.9
+jettison.version=1.3.4
 jetty.version=6.1.26
 jetty-util.version=6.1.26
 jline.version=0.9.94

Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Tue Feb 18 16:58:24 2014
@@ -60,7 +60,10 @@ public class TezMiniCluster extends Mini
             CONF_DIR.mkdirs();
 
             // Build mini DFS cluster
-            m_dfs = new MiniDFSCluster.Builder(new Configuration())
+            Configuration hdfsConf = new Configuration(false);
+            hdfsConf.addResource("core-default.xml");
+            hdfsConf.addResource("hdfs-default.xml");
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
                     .numDataNodes(2)
                     .format(true)
                     .racks(null)

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Tue Feb 18 16:58:24 2014
@@ -123,7 +123,7 @@ public class PlanHelper {
      * @return a LinkedList of operators contained within the plan which implement the supplied class; empty if no such ops exist.
      * @throws VisitorException
      */
-    public static <C extends PhysicalOperator> LinkedList<C> getPhysicalOperators(PhysicalPlan plan,
+    public static <C> LinkedList<C> getPhysicalOperators(PhysicalPlan plan,
             Class<C> opClass) throws VisitorException {
         OpFinder<C> finder = new OpFinder<C>(plan, opClass);
         finder.visit();
@@ -149,7 +149,7 @@ public class PlanHelper {
         return plan;
     }
 
-    private static class OpFinder<C extends PhysicalOperator> extends PhyPlanVisitor {
+    private static class OpFinder<C> extends PhyPlanVisitor {
 
         final Class<C> opClass;
         private LinkedList<C> foundOps = Lists.newLinkedList();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Feb 18 16:58:24 2014
@@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,16 +57,30 @@ public class POFRJoinTez extends POFRJoi
     @SuppressWarnings("rawtypes")
     private List<BroadcastKVReader> replReaders = Lists.newArrayList();
     private List<String> inputKeys;
+    private transient boolean isInputCached;
 
     public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException {
        super(copy);
        this.inputKeys = inputKeys;
     }
 
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        String cacheKey = "replicatemap-" + getOperatorKey().toString();
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            isInputCached = true;
+            inputsToSkip.addAll(inputKeys);
+        }
+    }
+
     @SuppressWarnings("rawtypes")
     @Override
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
             throws ExecException {
+        if (isInputCached) {
+            return;
+        }
         try {
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
@@ -89,8 +104,8 @@ public class POFRJoinTez extends POFRJoi
     protected void setUpHashMap() throws ExecException {
         String cacheKey = "replicatemap-" + getOperatorKey().toString();
 
-        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
-        if (cacheValue != null) {
+        if (isInputCached) {
+            Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
             replicates = (TupleToMapKey[]) cacheValue;
             log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
             return;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -61,6 +62,10 @@ public class POIdentityInOutTez extends 
     }
 
     @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException {
         LogicalInput input = inputs.get(inputKey);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Feb 18 16:58:24 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
@@ -59,6 +60,10 @@ public class POShuffleTezLoad extends PO
     }
 
     @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
             throws ExecException {
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -47,6 +48,10 @@ public class POSimpleTezLoad extends POL
     }
 
     @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf)
             throws ExecException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Feb 18 16:58:24 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,6 +54,10 @@ public class POValueInputTez extends Phy
     }
 
     @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf)
             throws ExecException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Feb 18 16:58:24 2014
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -53,6 +55,8 @@ import org.apache.tez.runtime.api.TezPro
 import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
 
 public class PigProcessor implements LogicalIOProcessor {
+
+    private static final Log LOG = LogFactory.getLog(PigProcessor.class);
     // Names of the properties that store serialized physical plans
     public static final String PLAN = "pig.exec.tez.plan";
     public static final String COMBINE_PLAN = "pig.exec.tez.combine.plan";
@@ -108,90 +112,101 @@ public class PigProcessor implements Log
     @Override
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
-        initializeInputs(inputs);
 
-        initializeOutputs(outputs);
+        try {
+            initializeInputs(inputs);
 
-        sampleVertex = conf.get("pig.sampleVertex");
-        if (sampleVertex != null) {
-            collectSample(sampleVertex, inputs.get(sampleVertex));
-        }
+            initializeOutputs(outputs);
 
-        List<PhysicalOperator> leaves = null;
 
-        if (!execPlan.isEmpty()) {
-            leaves = execPlan.getLeaves();
-            // TODO: Pull from all leaves when there are multiple leaves/outputs
-            leaf = leaves.get(0);
-        }
+            List<PhysicalOperator> leaves = null;
 
-        runPipeline(leaf);
+            if (!execPlan.isEmpty()) {
+                leaves = execPlan.getLeaves();
+                // TODO: Pull from all leaves when there are multiple leaves/outputs
+                leaf = leaves.get(0);
+            }
 
-        // For certain operators (such as STREAM), we could still have some work
-        // to do even after seeing the last input. These operators set a flag that
-        // says all input has been sent and to run the pipeline one more time.
-        if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
-            execPlan.endOfAllInput = true;
             runPipeline(leaf);
-        }
 
-        for (MROutput fileOutput : fileOutputs){
-            fileOutput.commit();
+            // For certain operators (such as STREAM), we could still have some work
+            // to do even after seeing the last input. These operators set a flag that
+            // says all input has been sent and to run the pipeline one more time.
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+                execPlan.endOfAllInput = true;
+                runPipeline(leaf);
+            }
+
+            for (MROutput fileOutput : fileOutputs){
+                fileOutput.commit();
+            }
+        } catch (Exception e) {
+            LOG.error("Encountered exception while processing: ", e);
+            throw e;
         }
     }
 
     private void initializeInputs(Map<String, LogicalInput> inputs)
-            throws IOException {
-        // getPhysicalOperators only accept C extends PhysicalOperator, so we can't change it to look for TezLoad
-        // TODO: Change that.
-        LinkedList<POSimpleTezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, POSimpleTezLoad.class);
-        for (POSimpleTezLoad tezLd : tezLds){
-            tezLd.attachInputs(inputs, conf);
+            throws Exception {
+
+        Set<String> inputsToSkip = new HashSet<String>();
+
+        sampleVertex = conf.get("pig.sampleVertex");
+        if (sampleVertex != null) {
+            collectSample(sampleVertex, inputs.get(sampleVertex));
+            inputsToSkip.add(sampleVertex);
         }
-        LinkedList<POShuffleTezLoad> shuffles = PlanHelper.getPhysicalOperators(execPlan, POShuffleTezLoad.class);
-        for (POShuffleTezLoad shuffle : shuffles){
-            shuffle.attachInputs(inputs, conf);
-        }
-        LinkedList<POIdentityInOutTez> identityInOuts = PlanHelper.getPhysicalOperators(execPlan, POIdentityInOutTez.class);
-        for (POIdentityInOutTez identityInOut : identityInOuts){
-            identityInOut.attachInputs(inputs, conf);
-        }
-        LinkedList<POValueInputTez> valueInputs = PlanHelper.getPhysicalOperators(execPlan, POValueInputTez.class);
-        for (POValueInputTez input : valueInputs){
-            input.attachInputs(inputs, conf);
+
+        LinkedList<TezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, TezLoad.class);
+        for (TezLoad tezLd : tezLds){
+            tezLd.addInputsToSkip(inputsToSkip);
         }
-        LinkedList<POUserFunc> scalarInputs = PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class);
-        for (POUserFunc userFunc : scalarInputs ) {
+
+        LinkedList<ReadScalarsTez> scalarInputs = new LinkedList<ReadScalarsTez>();
+        for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class) ) {
             if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                ((ReadScalarsTez)userFunc.getFunc()).attachInputs(inputs, conf);
+                scalarInputs.add((ReadScalarsTez)userFunc.getFunc());
+            }
+        }
+
+        for (ReadScalarsTez scalarInput: scalarInputs) {
+            scalarInput.addInputsToSkip(inputsToSkip);
+        }
+
+        for (Entry<String, LogicalInput> entry : inputs.entrySet()) {
+            if (inputsToSkip.contains(entry.getKey())) {
+                LOG.info("Skipping fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
+            } else {
+                LOG.info("Starting fetch of input " + entry.getValue() + " from vertex " + entry.getKey());
+                entry.getValue().start();
             }
         }
-        LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
-        for (POFRJoinTez broadcast : broadcasts){
-            broadcast.attachInputs(inputs, conf);
+
+        for (TezLoad tezLd : tezLds){
+            tezLd.attachInputs(inputs, conf);
+        }
+
+        for (ReadScalarsTez scalarInput: scalarInputs) {
+            scalarInput.attachInputs(inputs, conf);
         }
+
     }
 
     private void initializeOutputs(Map<String, LogicalOutput> outputs) throws Exception {
-        LinkedList<POStoreTez> stores = PlanHelper.getPhysicalOperators(execPlan, POStoreTez.class);
-        for (POStoreTez store : stores){
-            store.attachOutputs(outputs, conf);
-        }
-        LinkedList<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(execPlan, POLocalRearrangeTez.class);
-        for (POLocalRearrangeTez lr : rearranges){
-            lr.attachOutputs(outputs, conf);
-        }
-        LinkedList<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(execPlan, POValueOutputTez.class);
-        for (POValueOutputTez output : valueOutputs){
-            output.attachOutputs(outputs, conf);
-        }
-        for (Entry<String, LogicalOutput> entry : outputs.entrySet()){
-            LogicalOutput logicalOutput = entry.getValue();
-            if (logicalOutput instanceof MROutput){
-                MROutput mrOut = (MROutput) logicalOutput;
+
+        for (Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+            LogicalOutput output = entry.getValue();
+            LOG.info("Starting output " + output + " to vertex " + entry.getKey());
+            output.start();
+            if (output instanceof MROutput){
+                MROutput mrOut = (MROutput) output;
                 fileOutputs.add(mrOut);
             }
         }
+        LinkedList<TezOutput> tezOuts = PlanHelper.getPhysicalOperators(execPlan, TezOutput.class);
+        for (TezOutput tezOut : tezOuts){
+            tezOut.attachOutputs(outputs, conf);
+        }
     }
 
     protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
@@ -231,6 +246,8 @@ public class PigProcessor implements Log
         if (cached == Boolean.TRUE) {
             return;
         }
+        LOG.info("Starting fetch of input " + logicalInput + " from vertex " + sampleVertex);
+        logicalInput.start();
         BroadcastKVReader reader = (BroadcastKVReader) logicalInput.getReader();
         reader.next();
         Object val = reader.getCurrentValue();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java Tue Feb 18 16:58:24 2014
@@ -19,6 +19,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -30,6 +31,15 @@ import org.apache.tez.runtime.api.Logica
  */
 
 public interface TezLoad {
+
+    /**
+     * Add to the list of inputs to skip download if already available in vertex cache
+     *
+     * @param inputsToSkip
+     */
+    public void addInputsToSkip(Set<String> inputsToSkip);
+
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException;
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Tue Feb 18 16:58:24 2014
@@ -50,6 +50,7 @@ public class TezResourceManager {
     }
 
     public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+        resources.clear();
         TezResourceManager.stagingDir = stagingDir;
         TezResourceManager.pigContext = pigContext;
         TezResourceManager.conf = conf;

Modified: pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java?rev=1569428&r1=1569427&r2=1569428&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Tue Feb 18 16:58:24 2014
@@ -19,43 +19,75 @@ package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
 import org.apache.pig.data.Tuple;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 
 public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
     private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
-    String inputKey;
-    Tuple t;
+    private String inputKey;
+    private transient Tuple t;
+    private transient LogicalInput input;
+
     public ReadScalarsTez(String inputKey) {
         this.inputKey = inputKey;
     }
-    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
-            throws ExecException {
-        LogicalInput input = inputs.get(inputKey);
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        String cacheKey = "scalar-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        String cacheKey = "scalar-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            t = (Tuple) cacheValue;
+            return;
+        }
+        input = inputs.get(inputKey);
         if (input == null) {
             throw new ExecException("Input from vertex " + inputKey + " is missing");
         }
         try {
-            BroadcastKVReader reader = (BroadcastKVReader)input.getReader();
-            reader.next();
-            t = (Tuple)reader.getCurrentValue();
+            KeyValueReader reader = (KeyValueReader) input.getReader();
+            if (reader.next()) {
+                t = (Tuple) reader.getCurrentValue();
+                if (reader.next()) {
+                    String msg = "Scalar has more than one row in the output. "
+                            + "1st : " + t + ", 2nd :"
+                            + reader.getCurrentValue();
+                    throw new ExecException(msg);
+                }
+            } else {
+                LOG.info("Scalar input from vertex " + inputKey + " is null");
+            }
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
         } catch (Exception e) {
             throw new ExecException(e);
         }
+        ObjectCache.getInstance().cache(cacheKey, t);
+        log.info("Cached scalar in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
     }
 
     @Override
     public Object exec(Tuple input) throws IOException {
-        int pos = (Integer)input.get(0);
+        int pos = (Integer) input.get(0);
         Object obj = t.get(pos);
         return obj;
     }