You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [5/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/bac...

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Fri May 30 19:07:23 2014
@@ -18,15 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.data.Tuple;
@@ -47,16 +42,18 @@ public class PigMapReduceCounter {
         /**
          * Here is set up the task id, in order to be attached to each tuple
          **/
+        @Override
         public void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
 
-            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+            int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+            taskID = String.valueOf(taskIDInt);
 
             pOperator = mp.getLeaves().get(0);
 
             while(true) {
                 if(pOperator instanceof POCounter){
-                    ((POCounter) pOperator).setTaskId(taskID);
+                    ((POCounter) pOperator).setTaskId(taskIDInt);
                     ((POCounter) pOperator).resetLocalCounter();
                     break;
                 } else {
@@ -103,13 +100,14 @@ public class PigMapReduceCounter {
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
 
-            taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+            int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+            taskID = String.valueOf(taskIDInt);
 
             leaf = rp.getLeaves().get(0);
 
             while(true) {
                 if(leaf instanceof POCounter){
-                    ((POCounter) leaf).setTaskId(taskID);
+                    ((POCounter) leaf).setTaskId(taskIDInt);
                     ((POCounter) leaf).resetLocalCounter();
                     break;
                 } else {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Fri May 30 19:07:23 2014
@@ -34,11 +34,6 @@ public class PigSecondaryKeyComparator e
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
             Class<? extends TupleRawComparator> mComparatorClass = TupleFactory.getInstance().tupleRawComparatorClass();
             mComparator = mComparatorClass.newInstance();
@@ -47,7 +42,7 @@ public class PigSecondaryKeyComparator e
         } catch (IllegalAccessException e) {
             throw new RuntimeException(e);
         }
-        mComparator.setConf(jconf);
+        mComparator.setConf(conf);
     }
 
     protected PigSecondaryKeyComparator() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri May 30 19:07:23 2014
@@ -44,14 +44,8 @@ public class PigTextRawComparator extend
 
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             String msg = "Unable to deserialize pig.sortOrder";

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Fri May 30 19:07:23 2014
@@ -47,13 +47,8 @@ public class PigTupleDefaultRawComparato
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
-            mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+            mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
             throw new RuntimeException(ioe);
@@ -75,7 +70,7 @@ public class PigTupleDefaultRawComparato
     public boolean hasComparedTupleNull() {
         return mHasNullField;
     }
-    
+
     private static final BinInterSedes bis = new BinInterSedes();
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Fri May 30 19:07:23 2014
@@ -47,13 +47,8 @@ public class PigTupleSortComparator exte
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
-            mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+            mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
             throw new RuntimeException(ioe);
@@ -86,7 +81,7 @@ public class PigTupleSortComparator exte
                 throw new RuntimeException(e);
             }
         }
-        ((Configurable)mComparator).setConf(jconf);
+        ((Configurable)mComparator).setConf(conf);
     }
 
     @Override
@@ -165,4 +160,4 @@ public class PigTupleSortComparator exte
             return 0;
         }
     }
-}
\ No newline at end of file
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java Fri May 30 19:07:23 2014
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerMR extends MROpPlanVisitor implements SecondaryKeyOptimizer {
+    private static Log log = LogFactory.getLog(SecondaryKeyOptimizerMR.class);
+    private SecondaryKeyOptimizerInfo info;
+
+    /**
+     * @param plan
+     *            The MROperPlan to visit to discover keyType
+     */
+    public SecondaryKeyOptimizerMR(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        // Only optimize for Cogroup case
+        if (mr.isGlobalSort())
+            return;
+
+        // Don't optimize when we already have a custom partitioner
+        if (mr.getCustomPartitioner()!=null)
+            return;
+
+        info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan);
+        if (info != null && info.isUseSecondaryKey()) {
+            mr.setUseSecondaryKey(true);
+            mr.setSecondarySortOrder(info.getSecondarySortOrder());
+            log.info("Using Secondary Key Optimization for MapReduce node " + mr.getOperatorKey());
+        }
+    }
+
+
+    @Override
+    public int getNumSortRemoved() {
+        return (info == null) ? 0 : info.getNumSortRemoved();
+    }
+
+    @Override
+    public int getNumDistinctChanged() {
+        return (info == null) ? 0 : info.getNumDistinctChanged();
+    }
+
+    @Override
+    public int getNumUseSecondaryKey() {
+        return (info == null) ? 0 : info.getNumUseSecondaryKey();
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri May 30 19:07:23 2014
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configurable;
@@ -36,31 +35,41 @@ import org.apache.pig.impl.io.NullableTu
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.Pair;
 
+import com.google.common.collect.Maps;
+
 
 /**
   * This class is used by skewed join. For the partitioned table, the skewedpartitioner reads the key
   * distribution data from the sampler file and returns the reducer index in a round robin fashion.
-  * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned 
+  * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
   * in a round robin manner.
-  */ 
+  */
 public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
-    Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
-    static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
-    Integer totalReducers;
-    Configuration conf;
+    protected static final TupleFactory tf = TupleFactory.getInstance();
+
+    protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+    protected Integer totalReducers = -1;
+    protected boolean inited = false;
+
+    private Map<Tuple, Integer> currentIndexMap = Maps.newHashMap();
+    private Configuration conf;
 
     @Override
-    public int getPartition(PigNullableWritable wrappedKey, Writable value,
-            int numPartitions) {
-		// for streaming tables, return the partition index blindly
-		if (wrappedKey instanceof NullablePartitionWritable && (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
-			return ((NullablePartitionWritable)wrappedKey).getPartition();
-		}
+    public int getPartition(PigNullableWritable wrappedKey, Writable value, int numPartitions) {
+        if (!inited) {
+            init();
+        }
+
+        // for streaming tables, return the partition index blindly
+        if (wrappedKey instanceof NullablePartitionWritable &&
+                (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+            return ((NullablePartitionWritable)wrappedKey).getPartition();
+        }
 
         // for partition table, compute the index based on the sampler output
         Pair <Integer, Integer> indexes;
         Integer curIndex = -1;
-        Tuple keyTuple = TupleFactory.getInstance().newTuple(1);
+        Tuple keyTuple = tf.newTuple(1);
 
         // extract the key from nullablepartitionwritable
         PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
@@ -78,7 +87,7 @@ public class SkewedPartitioner extends P
 
         // if the partition file is empty, use numPartitions
         totalReducers = (totalReducers > 0) ? totalReducers : numPartitions;
-        
+
         indexes = reducerMap.get(keyTuple);
         // if the reducerMap does not contain the key, do the default hash based partitioning
         if (indexes == null) {
@@ -105,24 +114,29 @@ public class SkewedPartitioner extends P
         conf = job;
         PigMapReduce.sJobConfInternal.set(conf);
         PigMapReduce.sJobConf = conf;
-        String keyDistFile = job.get("pig.keyDistFile", "");
-        if (keyDistFile.length() == 0)
-            throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    protected void init() {
+        String keyDistFile = conf.get("pig.keyDistFile", "");
+        if (keyDistFile.length() == 0) {
+            throw new RuntimeException(this.getClass().getSimpleName() +
+                    " used but no key distribution found");
+        }
 
         try {
             Integer [] redCnt = new Integer[1]; 
             reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
-                    keyDistFile, redCnt, DataType.TUPLE, job);
+                    keyDistFile, redCnt, DataType.TUPLE, conf);
             // check if the partition file is empty
             totalReducers = (redCnt[0] == null) ? -1 : redCnt[0];
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        inited = true;
     }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri May 30 19:07:23 2014
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
@@ -58,20 +56,23 @@ import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
                                       implements Configurable {
-    PigNullableWritable[] quantiles;
-    RawComparator<PigNullableWritable> comparator;
-    PigContext pigContext;
-    final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
-        = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
 
-    private static final Log log = LogFactory.getLog(WeightedRangePartitioner.class);
+    protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts =
+            new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+    protected PigNullableWritable[] quantiles;
+    private RawComparator<PigNullableWritable> comparator;
+    private PigContext pigContext;
+    private Configuration job;
 
-    Configuration job;
+    protected boolean inited = false;
 
     @SuppressWarnings("unchecked")
     @Override
     public int getPartition(PigNullableWritable key, Writable value,
             int numPartitions){
+        if (!inited) {
+            init();
+        }
         if (comparator == null) {
             comparator = (RawComparator<PigNullableWritable>)PigMapReduce.sJobContext.getSortComparator();
         }
@@ -89,19 +90,15 @@ public class WeightedRangePartitioner ex
     }
 
     @SuppressWarnings("unchecked")
-    @Override
-    public void setConf(Configuration configuration) {
-        job = configuration;
-
+    public void init() {
+        weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
         try {
             pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        } catch (IOException e1) {
-            // should not happen
-            e1.printStackTrace();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to deserialize pig context: ", e);
         }
 
-        String quantilesFile = configuration.get("pig.quantilesFile", "");
-
+        String quantilesFile = job.get("pig.quantilesFile", "");
         if (quantilesFile.length() == 0) {
             throw new RuntimeException(this.getClass().getSimpleName()
                     + " used but no quantiles found");
@@ -109,36 +106,38 @@ public class WeightedRangePartitioner ex
 
         try{
             // use local file system to get the quantilesFile
+            Map<String, Object> quantileMap = null;
             Configuration conf;
             if (!pigContext.getExecType().isLocal()) {
                 conf = new Configuration(true);
             } else {
                 conf = new Configuration(false);
             }
-            if (configuration.get("fs.file.impl") != null) {
-                conf.set("fs.file.impl", configuration.get("fs.file.impl"));
+            if (job.get("fs.file.impl") != null) {
+                conf.set("fs.file.impl", job.get("fs.file.impl"));
             }
-            if (configuration.get("fs.hdfs.impl") != null) {
-                conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
+            if (job.get("fs.hdfs.impl") != null) {
+                conf.set("fs.hdfs.impl", job.get("fs.hdfs.impl"));
             }
 
-            MapRedUtil.copyTmpFileConfigurationValues(configuration, conf);
-
+            MapRedUtil.copyTmpFileConfigurationValues(job, conf);
             conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
 
             ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
                     conf, quantilesFile, 0);
-            DataBag quantilesList;
             Tuple t = loader.getNext();
             if (t != null) {
                 // the Quantiles file has a tuple as under:
                 // (numQuantiles, bag of samples)
                 // numQuantiles here is the reduce parallelism
-                Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
-                quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+                quantileMap = (Map<String, Object>) t.get(0);
+            }
+
+            if (quantileMap!=null) {
+                DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
-                for(Entry<Object, Object> ent : weightedPartsData.entrySet()){
+                for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                     Tuple key = (Tuple)ent.getKey(); // sample item which repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
@@ -151,9 +150,15 @@ public class WeightedRangePartitioner ex
             // called. If the quantiles file is empty due to either a bug or
             // a transient failure situation on the dfs, then weightedParts will
             // not be populated and the job will fail in getPartition()
-        }catch (Exception e){
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        inited = true;
+    }
+
+    @Override
+    public void setConf(Configuration configuration) {
+        job = configuration;
     }
 
     /**
@@ -161,7 +166,7 @@ public class WeightedRangePartitioner ex
      * @return
      * @throws ExecException
      */
-    private float[] getProbVec(Tuple values) throws ExecException {
+    protected float[] getProbVec(Tuple values) throws ExecException {
         float[] probVec = new float[values.size()];
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
@@ -169,7 +174,7 @@ public class WeightedRangePartitioner ex
         return probVec;
     }
 
-    private PigNullableWritable getPigNullableWritable(Tuple t) {
+    protected PigNullableWritable getPigNullableWritable(Tuple t) {
         try {
             // user comparators work with tuples - so if user comparator
             // is being used OR if there are more than 1 sort cols, use
@@ -191,9 +196,9 @@ public class WeightedRangePartitioner ex
         }
     }
 
-    private void convertToArray(
-            DataBag quantilesListAsBag) {
+    protected void convertToArray(DataBag quantilesListAsBag) {
         ArrayList<PigNullableWritable> quantilesList = getList(quantilesListAsBag);
+
         if ("true".equals(job.get("pig.usercomparator")) ||
                 quantilesList.get(0).getClass().equals(NullableTuple.class)) {
             quantiles = quantilesList.toArray(new NullableTuple[0]);
@@ -227,7 +232,6 @@ public class WeightedRangePartitioner ex
      * @return
      */
     private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag) {
-
         ArrayList<PigNullableWritable> list = new ArrayList<PigNullableWritable>();
         for (Tuple tuple : quantilesListAsBag) {
             list.add(getPigNullableWritable(tuple));
@@ -240,5 +244,4 @@ public class WeightedRangePartitioner ex
         return job;
     }
 
-
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri May 30 19:07:23 2014
@@ -24,6 +24,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -32,9 +34,9 @@ import org.apache.pig.impl.plan.VisitorE
 /**
  * This visitor visits the MRPlan and does the following
  * for each MROper: If the map plan or the reduce plan of the MROper has
- *  an end of all input flag present in it, this marks in the MROper whether the map 
+ *  an end of all input flag present in it, this marks in the MROper whether the map
  * has an end of all input flag set or if the reduce has an end of all input flag set.
- *  
+ *
  */
 public class EndOfAllInputSetter extends MROpPlanVisitor {
 
@@ -47,43 +49,40 @@ public class EndOfAllInputSetter extends
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        
+
         EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
         checker.visit();
         if(checker.isEndOfAllInputPresent()) {
-            mr.setEndOfAllInputInMap(true);            
+            mr.setEndOfAllInputInMap(true);
         }
-        
+
         checker = new EndOfAllInputChecker(mr.reducePlan);
         checker.visit();
         if(checker.isEndOfAllInputPresent()) {
-            mr.setEndOfAllInputInReduce(true);            
-        }      
-        
+            mr.setEndOfAllInputInReduce(true);
+        }
+
     }
 
-    static class EndOfAllInputChecker extends PhyPlanVisitor {
-        
+    public static class EndOfAllInputChecker extends PhyPlanVisitor {
+
         private boolean endOfAllInputFlag = false;
         public EndOfAllInputChecker(PhysicalPlan plan) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitStream(POStream stream) throws VisitorException {
             // stream present
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
             // merge join present
             endOfAllInputFlag = true;
         }
-       
+
         @Override
         public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
             // map side group present
@@ -97,7 +96,17 @@ public class EndOfAllInputSetter extends
         }
 
         @Override
-        public void visitPartialAgg(POPartialAgg partAgg){
+        public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
+            endOfAllInputFlag = true;
+        }
+
+        @Override
+        public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+            endOfAllInputFlag = true;
+        }
+        
+        @Override
+        public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
 
@@ -109,4 +118,3 @@ public class EndOfAllInputSetter extends
         }
     }
 }
-

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri May 30 19:07:23 2014
@@ -56,7 +56,7 @@ public class POPackageAnnotator extends 
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        
+
         // POPackage OR POJoinPackage could be present in the combine plan
         // OR in the reduce plan. POPostCombinerPackage could
         // be present only in the reduce plan. Search in these two
@@ -67,9 +67,9 @@ public class POPackageAnnotator extends 
             POPackage pkg = pkgDiscoverer.getPkg();
             if(pkg != null) {
                 handlePackage(mr, pkg);
-            }   
+            }
         }
-        
+
         if(!mr.reducePlan.isEmpty()) {
             PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
             pkgDiscoverer.visit();
@@ -89,14 +89,14 @@ public class POPackageAnnotator extends 
                 }
             }
         }
-        
+
     }
-    
+
     private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
         // the LocalRearrange(s) could either be in the map of this MapReduceOper
         // OR in the reduce of predecessor MapReduceOpers
         int lrFound = 0;
-        
+
         lrFound = patchPackage(mr.mapPlan, pkg);
         if(lrFound != pkg.getNumInps()) {
             // we did not find the LocalRearrange(s) in the map plan
@@ -109,7 +109,7 @@ public class POPackageAnnotator extends 
                 lrFound += patchPackage(mrOper.reducePlan, pkg);
                 if(lrFound == pkg.getNumInps()) {
                     break;
-                }     
+                }
             }
         }
         if(lrFound != pkg.getNumInps()) {
@@ -126,7 +126,7 @@ public class POPackageAnnotator extends 
         // the package
         return lrDiscoverer.getLoRearrangeFound();
     }
-    
+
     /**
      * Simple visitor of the "Reduce" physical plan
      * which will get a reference to the POPacakge
@@ -135,14 +135,11 @@ public class POPackageAnnotator extends 
     static class PackageDiscoverer extends PhyPlanVisitor {
 
         private POPackage pkg;
-        
+
         public PackageDiscoverer(PhysicalPlan plan) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitPackage(POPackage pkg) throws VisitorException {
             this.pkg = pkg;
@@ -154,9 +151,9 @@ public class POPackageAnnotator extends 
         public POPackage getPkg() {
             return pkg;
         }
-        
+
     }
-    
+
     /**
      * Physical Plan visitor which tries to get the
      * LocalRearrange(s) present in the plan (if any) and
@@ -165,18 +162,15 @@ public class POPackageAnnotator extends 
      * present in the "key")
      */
     static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-        
+
         private int loRearrangeFound = 0;
         private POPackage pkg;
-        
+
         public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
             this.pkg = pkg;
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             loRearrangeFound++;
@@ -194,18 +188,18 @@ public class POPackageAnnotator extends 
             keyInfo = pkg.getPkgr().getKeyInfo();
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-            
+
             if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
-                // something is wrong - we should not be getting key info 
+                // something is wrong - we should not be getting key info
                 // for the same index from two different Local Rearranges
                 int errCode = 2087;
                 String msg = "Unexpected problem during optimization." +
-                " Found index:" + lrearrange.getIndex() + 
+                " Found index:" + lrearrange.getIndex() +
                 " in multiple LocalRearrange operators.";
                 throw new OptimizerException(msg, errCode, PigException.BUG);
-                
+
             }
-            keyInfo.put(Integer.valueOf(lrearrange.getIndex()), 
+            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
                 new Pair<Boolean, Map<Integer, Integer>>(
                         lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
             pkg.getPkgr().setKeyInfo(keyInfo);

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.optimizer;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Remove POSort and change PODistinct to use POSortedDistinct in nested foreach plan by sorting on a secondary key
+ */
+public interface SecondaryKeyOptimizer {
+
+    int getNumSortRemoved();
+
+    int getNumDistinctChanged();
+
+    int getNumUseSecondaryKey();
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri May 30 19:07:23 2014
@@ -65,6 +65,8 @@ public abstract class PhysicalOperator e
     private static final Log log = LogFactory.getLog(PhysicalOperator.class);
 
     protected static final long serialVersionUID = 1L;
+    protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+    protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
 
     // The degree of parallelism requested
     protected int requestedParallelism;
@@ -98,7 +100,7 @@ public abstract class PhysicalOperator e
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
-    private static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+    public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
 
     // Will be used by operators to aggregate warning messages
     // Should be set by the backends to appropriate implementations that
@@ -137,6 +139,21 @@ public abstract class PhysicalOperator e
         res = new Result();
     }
 
+    public PhysicalOperator(PhysicalOperator copy) {
+        super (copy.getOperatorKey());
+        this.res = new Result();
+        this.requestedParallelism = copy.requestedParallelism;
+        this.inputs = copy.inputs;
+        this.outputs = copy.outputs;
+        this.resultType = copy.resultType;
+        this.parentPlan = copy.parentPlan;
+        this.inputAttached = copy.inputAttached;
+        this.alias = copy.alias;
+        this.lineageTracer = copy.lineageTracer;
+        this.accum = copy.accum;
+        this.originalLocations = copy.originalLocations;
+    }
+
     @Override
     public void setIllustrator(Illustrator illustrator) {
 	      this.illustrator = illustrator;
@@ -166,6 +183,10 @@ public abstract class PhysicalOperator e
         return (alias == null) ? "" : (alias + ": ");
     }
 
+    public void setAlias(String alias) {
+        this.alias = alias;
+    }
+
     public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
         this.alias = alias;
         this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
@@ -265,11 +286,9 @@ public abstract class PhysicalOperator e
      */
     public Result processInput() throws ExecException {
         try {
-            Result res = new Result();
             if (input == null && (inputs == null || inputs.size() == 0)) {
                 // log.warn("No inputs found. Signaling End of Processing.");
-                res.returnStatus = POStatus.STATUS_EOP;
-                return res;
+                return new Result(POStatus.STATUS_EOP, null);
             }
 
             // Should be removed once the model is clear
@@ -280,6 +299,7 @@ public abstract class PhysicalOperator e
             if (!isInputAttached()) {
                 return inputs.get(0).getNextTuple();
             } else {
+                Result res = new Result();
                 res.result = input;
                 res.returnStatus = POStatus.STATUS_OK;
                 detachInput();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Fri May 30 19:07:23 2014
@@ -21,7 +21,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 
 /**
- * 
+ *
  * An interface to allow aggregation of messages
  */
 @InterfaceAudience.Private

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri May 30 19:07:23 2014
@@ -550,6 +550,11 @@ public class POUserFunc extends Expressi
     public FuncSpec getFuncSpec() {
         return funcSpec;
     }
+    
+    public void setFuncSpec(FuncSpec funcSpec) {
+        this.funcSpec = funcSpec;
+        instantiateFunc(funcSpec);
+    }
 
     public String[] getCacheFiles() {
         return cacheFiles;

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig Fri May 30 19:07:23 2014
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+
+import static org.apache.pig.PigConfiguration.TIME_UDFS;
+import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
+import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.TerminatingAccumulator;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class POUserFunc extends ExpressionOperator {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POUserFunc.class);
+    private static final TupleFactory tf = TupleFactory.getInstance();
+
+    private transient String counterGroup;
+    private transient EvalFunc func;
+    private transient String[] cacheFiles = null;
+
+    FuncSpec funcSpec;
+    FuncSpec origFSpec;
+    public static final byte INITIAL = 0;
+    public static final byte INTERMEDIATE = 1;
+    public static final byte FINAL = 2;
+    private boolean initialized = false;
+    private MonitoredUDFExecutor executor = null;
+
+    private PhysicalOperator referencedOperator = null;
+    private boolean isAccumulationDone;
+    private String signature;
+    private boolean haveCheckedIfTerminatingAccumulator;
+
+    private long numInvocations = 0L;
+    private long timingFrequency = 100L;
+    private boolean doTiming = false;
+
+    public PhysicalOperator getReferencedOperator() {
+        return referencedOperator;
+    }
+
+    public void setReferencedOperator(PhysicalOperator referencedOperator) {
+        this.referencedOperator = referencedOperator;
+    }
+
+    public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        this(k, rp, inp, null);
+    }
+
+    public POUserFunc(
+            OperatorKey k,
+            int rp,
+            List<PhysicalOperator> inp,
+            FuncSpec funcSpec) {
+        this(k, rp, inp, funcSpec, null);
+    }
+
+    public POUserFunc(
+            OperatorKey k,
+            int rp,
+            List<PhysicalOperator> inp,
+            FuncSpec funcSpec,
+            EvalFunc func) {
+        super(k, rp);
+        super.setInputs(inp);
+        this.funcSpec = funcSpec;
+        this.origFSpec = funcSpec;
+        this.func = func;
+        instantiateFunc(funcSpec);
+    }
+
+    private void instantiateFunc(FuncSpec fSpec) {
+        this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+        this.setSignature(signature);
+        this.setFuncInputSchema(signature);
+        if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
+            executor = new MonitoredUDFExecutor(func);
+        }
+        //the next couple of initializations do not work as intended for the following reasons
+        //the reporter and pigLogger are member variables of PhysicalOperator
+        //when instanitateFunc is invoked at deserialization time, both
+        //reporter and pigLogger are null. They are set during map and reduce calls,
+        //making the initializations here basically useless. Look at the processInput
+        //method where these variables are re-initialized. At that point, the PhysicalOperator
+        //is set up correctly with the reporter and pigLogger references
+        this.func.setReporter(getReporter());
+        this.func.setPigLogger(pigLogger);
+    }
+
+    private transient TupleMaker inputTupleMaker;
+    private boolean usingSchemaTupleFactory;
+
+    @Override
+    public Result processInput() throws ExecException {
+
+        // Make sure the reporter is set, because it isn't getting carried
+        // across in the serialization (don't know why).  I suspect it's as
+        // cheap to call the setReporter call everytime as to check whether I
+        // have (hopefully java will inline it).
+        if(!initialized) {
+            func.setReporter(getReporter());
+            func.setPigLogger(pigLogger);
+            Configuration jobConf = UDFContext.getUDFContext().getJobConf();
+            if (jobConf != null) {
+                doTiming = jobConf.getBoolean(TIME_UDFS, false);
+                if (doTiming) {
+                    counterGroup = funcSpec.toString();
+                    timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+                }
+            }
+            // We initialize here instead of instantiateFunc because this is called
+            // when actual processing has begun, whereas a function can be instantiated
+            // on the frontend potentially (mainly for optimization)
+            Schema tmpS = func.getInputSchema();
+            if (tmpS != null) {
+                //Currently, getInstanceForSchema returns null if no class was found. This works fine...
+                //if it is null, the default will be used. We pass the context because if it happens that
+                //the same Schema was generated elsewhere, we do not want to override user expectations
+                inputTupleMaker = SchemaTupleFactory.getInstance(tmpS, false, GenContext.UDF);
+                if (inputTupleMaker == null) {
+                    LOG.debug("No SchemaTupleFactory found for Schema ["+tmpS+"], using default TupleFactory");
+                    usingSchemaTupleFactory = false;
+                } else {
+                    LOG.debug("Using SchemaTupleFactory for Schema: " + tmpS);
+                    usingSchemaTupleFactory = true;
+                }
+
+                //In the future, we could optionally use SchemaTuples for output as well
+            }
+
+            if (inputTupleMaker == null) {
+                inputTupleMaker = TupleFactory.getInstance();
+            }
+
+            initialized = true;
+        }
+
+        Result res = new Result();
+        if (input == null && (inputs == null || inputs.size()==0)) {
+            res.returnStatus = POStatus.STATUS_EOP;
+            return res;
+        }
+
+        //Should be removed once the model is clear
+        if(getReporter()!=null) {
+            getReporter().progress();
+        }
+
+
+        if(isInputAttached()) {
+            res.result = input;
+            res.returnStatus = POStatus.STATUS_OK;
+            detachInput();
+            return res;
+        } else {
+            //we decouple this because there may be cases where the size is known and it isn't a schema
+            // tuple factory
+            boolean knownSize = usingSchemaTupleFactory;
+            int knownIndex = 0;
+            res.result = inputTupleMaker.newTuple();
+
+            Result temp = null;
+
+            for(PhysicalOperator op : inputs) {
+                temp = op.getNext(op.getResultType());
+                if(temp.returnStatus!=POStatus.STATUS_OK) {
+                    return temp;
+                }
+
+                if(op instanceof POProject &&
+                        op.getResultType() == DataType.TUPLE){
+                    POProject projOp = (POProject)op;
+                    if(projOp.isProjectToEnd()){
+                        Tuple trslt = (Tuple) temp.result;
+                        Tuple rslt = (Tuple) res.result;
+                        for(int i=0;i<trslt.size();i++) {
+                            if (knownSize) {
+                                rslt.set(knownIndex++, trslt.get(i));
+                            } else {
+                                rslt.append(trslt.get(i));
+                            }
+                        }
+                        continue;
+                    }
+                }
+                if (knownSize) {
+                    ((Tuple)res.result).set(knownIndex++, temp.result);
+                } else {
+                    ((Tuple)res.result).append(temp.result);
+                }
+            }
+            res.returnStatus = temp.returnStatus;
+
+            return res;
+        }
+    }
+
+    private boolean isEarlyTerminating = false;
+
+    private void setIsEarlyTerminating() {
+        isEarlyTerminating = true;
+    }
+
+    private boolean isEarlyTerminating() {
+        return isEarlyTerminating;
+    }
+
+    private boolean isTerminated = false;
+
+    private boolean hasBeenTerminated() {
+        return isTerminated;
+    }
+
+    private void earlyTerminate() {
+        isTerminated = true;
+    }
+
+    private Result getNext() throws ExecException {
+        Result result = processInput();
+        long startNanos = 0;
+        boolean timeThis = doTiming && (numInvocations++ % timingFrequency == 0);
+        if (timeThis) {
+            startNanos = System.nanoTime();
+            PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_INVOCATION_COUNTER, timingFrequency);
+        }
+        try {
+            if(result.returnStatus == POStatus.STATUS_OK) {
+                Tuple t = (Tuple) result.result;
+
+                // For backward compatibility, we short-circuit tuples whose
+                // size is 1 and field is null. (See PIG-3679)
+                if (t.size() == 1 && t.isNull(0)) {
+                    pigLogger.warn(this, "All the input values are null, skipping the invocation of UDF",
+                            PigWarning.SKIP_UDF_CALL_FOR_NULL);
+                    Schema outputSchema = func.outputSchema(func.getInputSchema());
+                    // If the output schema is tuple (i.e. multiple fields are
+                    // to be returned), we return a tuple where every field is
+                    // null.
+                    if (outputSchema != null && outputSchema.getField(0).type == DataType.TUPLE) {
+                        result.result = tf.newTuple(outputSchema.getField(0).schema.size());
+                    // Otherwise, we simply return null since it can be cast to
+                    // any data type.
+                    } else {
+                        result.result = null;
+                    }
+                    return result;
+                }
+
+                if (isAccumulative()) {
+                    if (isAccumStarted()) {
+                        if (!haveCheckedIfTerminatingAccumulator) {
+                            haveCheckedIfTerminatingAccumulator  = true;
+                            if (func instanceof TerminatingAccumulator<?>)
+                                setIsEarlyTerminating();
+                        }
+
+                        if (!hasBeenTerminated() && isEarlyTerminating() && ((TerminatingAccumulator<?>)func).isFinished()) {
+                            earlyTerminate();
+                        }
+
+                        if (hasBeenTerminated()) {
+                            result.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
+                            result.result = null;
+                            isAccumulationDone = false;
+                        } else {
+                            ((Accumulator)func).accumulate((Tuple)result.result);
+                            result.returnStatus = POStatus.STATUS_BATCH_OK;
+                            result.result = null;
+                            isAccumulationDone = false;
+                        }
+                    }else{
+                        if(isAccumulationDone){
+                            //PORelationToExprProject does not return STATUS_EOP
+                            // so that udf gets called both when isAccumStarted
+                            // is first true and then set to false, even
+                            //when the input relation is empty.
+                            // so the STATUS_EOP has to be sent from POUserFunc,
+                            // after the results have been sent.
+                            result.result = null;
+                            result.returnStatus = POStatus.STATUS_EOP;
+                        }
+                        else{
+                            result.result = ((Accumulator)func).getValue();
+                            result.returnStatus = POStatus.STATUS_OK;
+                            ((Accumulator)func).cleanup();
+                            isAccumulationDone = true;
+                        }
+                    }
+                } else {
+                    if (executor != null) {
+                        result.result = executor.monitorExec((Tuple) result.result);
+                    } else {
+                        result.result = func.exec((Tuple) result.result);
+                    }
+                }
+            }
+            if (timeThis) {
+                PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
+                        Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
+            }
+            return result;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (IOException ioe) {
+            int errCode = 2078;
+            String msg = "Caught error from UDF: " + funcSpec.getClassName();
+            String footer = " [" + ioe.getMessage() + "]";
+
+            if(ioe instanceof PigException) {
+                int udfErrorCode = ((PigException)ioe).getErrorCode();
+                if(udfErrorCode != 0) {
+                    errCode = udfErrorCode;
+                    msg = ((PigException)ioe).getMessage();
+                } else {
+                    msg += " [" + ((PigException)ioe).getMessage() + " ]";
+                }
+            } else {
+                msg += footer;
+            }
+
+            throw new ExecException(msg, errCode, PigException.BUG, ioe);
+        } catch (IndexOutOfBoundsException ie) {
+            int errCode = 2078;
+            String msg = "Caught error from UDF: " + funcSpec.getClassName() +
+                    ", Out of bounds access [" + ie.getMessage() + "]";
+            throw new ExecException(msg, errCode, PigException.BUG, ie);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextDataBag() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextInteger() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextBoolean() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextDataByteArray() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextDouble() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextBigInteger() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextBigDecimal() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextFloat() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextLong() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextDateTime() throws ExecException {
+
+        return getNext();
+    }
+
+    @Override
+    public Result getNextMap() throws ExecException {
+        return getNext();
+    }
+
+    @Override
+    public Result getNextString() throws ExecException {
+        return getNext();
+    }
+
+    public void setAlgebraicFunction(byte Function) throws ExecException {
+        // This will only be used by the optimizer for putting correct functions
+        // in the mapper,
+        // combiner and reduce. This helps in maintaining the physical plan as
+        // is without the
+        // optimiser having to replace any operators.
+        // You wouldn't be able to make two calls to this function on the same
+        // algebraic EvalFunc as
+        // func is being changed.
+        switch (Function) {
+        case INITIAL:
+            funcSpec = new FuncSpec(getInitial());
+            break;
+        case INTERMEDIATE:
+            funcSpec = new FuncSpec(getIntermed());
+            break;
+        case FINAL:
+            funcSpec = new FuncSpec(getFinal());
+            break;
+        }
+        funcSpec.setCtorArgs(origFSpec.getCtorArgs());
+        instantiateFunc(funcSpec);
+        setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
+    }
+
+    public String getInitial() throws ExecException {
+        instantiateFunc(origFSpec);
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getInitial();
+        } else {
+            int errCode = 2072;
+            String msg = "Attempt to run a non-algebraic function"
+                + " as an algebraic function";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    public String getIntermed() throws ExecException {
+        instantiateFunc(origFSpec);
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getIntermed();
+        } else {
+            int errCode = 2072;
+            String msg = "Attempt to run a non-algebraic function"
+                + " as an algebraic function";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    public String getFinal() throws ExecException {
+        instantiateFunc(origFSpec);
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getFinal();
+        } else {
+            int errCode = 2072;
+            String msg = "Attempt to run a non-algebraic function"
+                + " as an algebraic function";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    public Type getReturnType() {
+        return func.getReturnType();
+    }
+
+    public void finish() {
+        func.finish();
+        if (executor != null) {
+            executor.terminate();
+        }
+    }
+
+    public Schema outputSchema(Schema input) {
+        return func.outputSchema(input);
+    }
+
+    public Boolean isAsynchronous() {
+        return func.isAsynchronous();
+    }
+
+    @Override
+    public String name() {
+        return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+
+        return false;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+
+        v.visitUserFunc(this);
+    }
+
+    public FuncSpec getFuncSpec() {
+        return funcSpec;
+    }
+
+    public String[] getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public void setCacheFiles(String[] cf) {
+        cacheFiles = cf;
+    }
+
+    public boolean combinable() {
+        return (func instanceof Algebraic);
+    }
+
+    @Override
+    public POUserFunc clone() throws CloneNotSupportedException {
+        // Inputs will be patched up later by PhysicalPlan.clone()
+        POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
+            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+            requestedParallelism, null, funcSpec.clone());
+        clone.setResultType(resultType);
+        clone.signature = signature;
+        return clone;
+    }
+
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+        is.defaultReadObject();
+        instantiateFunc(funcSpec);
+    }
+
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setAccumStart() {
+        if (isAccumulative() && !isAccumStarted()) {
+            super.setAccumStart();
+            ((Accumulator)func).cleanup();
+        }
+    }
+
+    @Override
+    public void setResultType(byte resultType) {
+        this.resultType = resultType;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return (Tuple) out;
+    }
+
+    public EvalFunc getFunc() {
+        return func;
+    }
+
+    public void setSignature(String signature) {
+        this.signature = signature;
+        if (this.func!=null) {
+            this.func.setUDFContextSignature(signature);
+        }
+    }
+
+    /**
+     * Sets EvalFunc's inputschema based on the signature
+     * @param signature
+     */
+    public void setFuncInputSchema(String signature) {
+        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+        if(tmpS!=null) {
+            this.func.setInputSchema(tmpS);
+        }
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri May 30 19:07:23 2014
@@ -63,8 +63,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -77,10 +79,10 @@ import org.apache.pig.impl.plan.VisitorE
 
 /**
  * The visitor class for the Physical Plan. To use this,
- * create the visitor with the plan to be visited. Call 
+ * create the visitor with the plan to be visited. Call
  * the visit() method to traverse the plan in a depth first
  * fashion.
- * 
+ *
  * This class also visits the nested plans inside the operators.
  * One has to extend this class to modify the nature of each visit
  * and to maintain any relevant state information between the visits
@@ -93,25 +95,28 @@ public class PhyPlanVisitor extends Plan
         super(plan, walker);
     }
 
+    public void visit(PhysicalOperator op) {
+        // do nothing
+    }
+
     public void visitLoad(POLoad ld) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitStore(POStore st) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitNative(PONative nat) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitFilter(POFilter fl) throws VisitorException{
         pushWalker(mCurrentWalker.spawnChildWalker(fl.getPlan()));
         visit();
         popWalker();
     }
 
-
     public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
         List<PhysicalPlan> inpPlans = mg.getPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -120,7 +125,7 @@ public class PhyPlanVisitor extends Plan
             popWalker();
         }
     }
-    
+
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
         List<PhysicalPlan> inpPlans = lr.getPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -133,11 +138,11 @@ public class PhyPlanVisitor extends Plan
     public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitPackage(POPackage pkg) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -146,11 +151,11 @@ public class PhyPlanVisitor extends Plan
             popWalker();
         }
     }
-    
+
     public void visitUnion(POUnion un) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitSplit(POSplit spl) throws VisitorException{
         List<PhysicalPlan> plans = spl.getPlans();
         for (PhysicalPlan plan : plans) {
@@ -177,78 +182,78 @@ public class PhyPlanVisitor extends Plan
         //do nothing
     }
 
-	public void visitDistinct(PODistinct distinct) throws VisitorException {
+    public void visitDistinct(PODistinct distinct) throws VisitorException {
         //do nothing
-	}
+    }
 
-	public void visitSort(POSort sort) throws VisitorException {
+    public void visitSort(POSort sort) throws VisitorException {
         List<PhysicalPlan> inpPlans = sort.getSortPlans();
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
             popWalker();
         }
-	}
-    
+    }
+
     public void visitConstant(ConstantExpression cnst) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitProject(POProject proj) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitLessThan(LessThanExpr lt) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitEqualTo(EqualToExpr eq) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitRegexp(PORegexp re) throws VisitorException{
         //do nothing
     }
 
     public void visitIsNull(POIsNull isNull) throws VisitorException {
     }
-    
+
     public void visitAdd(Add add) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitSubtract(Subtract sub) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMultiply(Multiply mul) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitDivide(Divide dv) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMod(Mod mod) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitAnd(POAnd and) throws VisitorException {
         //do nothing
     }
@@ -263,91 +268,94 @@ public class PhyPlanVisitor extends Plan
 
     public void visitBinCond(POBinCond binCond) {
         // do nothing
-        
     }
 
     public void visitNegative(PONegative negative) {
         //do nothing
-        
     }
-    
+
     public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
         //do nothing
     }
 
     public void visitMapLookUp(POMapLookUp mapLookUp) {
         // TODO Auto-generated method stub
-        
     }
 
     public void visitCast(POCast cast) {
         // TODO Auto-generated method stub
-        
     }
-    
+
     public void visitLimit(POLimit lim) throws VisitorException{
-        //do nothing
+        PhysicalPlan inpPlan = lim.getLimitPlan();
+        if (inpPlan!=null) {
+            pushWalker(mCurrentWalker.spawnChildWalker(inpPlan));
+            visit();
+            popWalker();
+        }
     }
-    
+
     public void visitCross(POCross cross) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitFRJoin(POFRJoin join) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMergeJoin(POMergeJoin join) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
-        
+
     }
     /**
      * @param stream
-     * @throws VisitorException 
+     * @throws VisitorException
      */
     public void visitStream(POStream stream) throws VisitorException {
         // TODO Auto-generated method stub
-        
     }
 
-	public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+    public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
 
-	}
+    }
 
-	public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+    public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
         List<PhysicalPlan> inpPlans = pr.getPlans();
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
             popWalker();
         }
-	}
+    }
 
     /**
      * @param optimizedForEach
      */
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         // TODO Auto-generated method stub
-        
     }
 
     /**
      * @param preCombinerLocalRearrange
      */
     public void visitPreCombinerLocalRearrange(
-            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+            POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
         // TODO Auto-generated method stub
     }
 
-    public void visitPartialAgg(POPartialAgg poPartialAgg) {
+    public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
     }
 
+    public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+    }
 
+    public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri May 30 19:07:23 2014
@@ -29,9 +29,10 @@ import java.util.Map;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryComparisonOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -39,18 +40,18 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.MultiMap;
 
 /**
- * 
- * The base class for all types of physical plans. 
+ *
+ * The base class for all types of physical plans.
  * This extends the Operator Plan.
  *
  */
 public class PhysicalPlan extends OperatorPlan<PhysicalOperator> implements Cloneable {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
     // marker to indicate whether all input for this plan
     // has been sent - this is currently used in POStream
     // to know if all map() calls and reduce() calls are finished
@@ -58,18 +59,18 @@ public class PhysicalPlan extends Operat
     public boolean endOfAllInput = false;
 
     private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
-    
+
     public PhysicalPlan() {
         super();
     }
-    
+
     public void attachInput(Tuple t){
         List<PhysicalOperator> roots = getRoots();
         for (PhysicalOperator operator : roots) {
             operator.attachInput(t);
 		}
     }
-    
+
     public void detachInput(){
         for(PhysicalOperator op : getRoots())
             op.detachInput();
@@ -117,7 +118,7 @@ public class PhysicalPlan extends Operat
             ps.println("<physicalPlan>XML Not Supported</physicalPlan>");
             return;
         }
-        
+
         ps.println("#-----------------------------------------------");
         ps.println("# Physical Plan:");
         ps.println("#-----------------------------------------------");
@@ -132,22 +133,22 @@ public class PhysicalPlan extends Operat
         }
         ps.println("");
   }
-    
+
     @Override
     public void connect(PhysicalOperator from, PhysicalOperator to)
             throws PlanException {
-        
+
         super.connect(from, to);
         to.setInputs(getPredecessors(to));
     }
-    
+
     /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
         if(!to.supportsMultipleInputs()){
             throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
         }
-        
+
     }*/
-    
+
     @Override
     public void remove(PhysicalOperator op) {
         op.setInputs(null);
@@ -169,7 +170,7 @@ public class PhysicalPlan extends Operat
         }
         super.remove(op);
     }
-    
+
     /* (non-Javadoc)
      * @see org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
      */
@@ -187,10 +188,10 @@ public class PhysicalPlan extends Operat
                     if(inputs.get(i) == oldNode) {
                         inputs.set(i, newNode);
                     }
-                }    
+                }
             }
         }
-        
+
     }
 
     /* (non-Javadoc)
@@ -226,7 +227,7 @@ public class PhysicalPlan extends Operat
         // clones, create a map between clone and original.  Then walk the
         // connections in this plan and create equivalent connections in the
         // clone.
-        Map<PhysicalOperator, PhysicalOperator> matches = 
+        Map<PhysicalOperator, PhysicalOperator> matches =
             new HashMap<PhysicalOperator, PhysicalOperator>(mOps.size());
         for (PhysicalOperator op : mOps.keySet()) {
             PhysicalOperator c = op.clone();
@@ -264,7 +265,7 @@ public class PhysicalPlan extends Operat
         for (PhysicalOperator op : mOps.keySet()) {
             List<PhysicalOperator> inputs = op.getInputs();
             if (inputs == null || inputs.size() == 0) continue;
-            List<PhysicalOperator> newInputs = 
+            List<PhysicalOperator> newInputs =
                 new ArrayList<PhysicalOperator>(inputs.size());
             PhysicalOperator cloneOp = matches.get(op);
             if (cloneOp == null) {
@@ -281,13 +282,17 @@ public class PhysicalPlan extends Operat
             }
             cloneOp.setInputs(newInputs);
         }
-        
+
         for (PhysicalOperator op : mOps.keySet()) {
-            if (op instanceof UnaryComparisonOperator) {
-                UnaryComparisonOperator orig = (UnaryComparisonOperator)op;
-                UnaryComparisonOperator cloneOp = (UnaryComparisonOperator)matches.get(op);
-                cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
+            if (op instanceof ComparisonOperator) {
+                ComparisonOperator orig = (ComparisonOperator)op;
+                ComparisonOperator cloneOp = (ComparisonOperator)matches.get(op);
                 cloneOp.setOperandType(orig.getOperandType());
+            }
+            if (op instanceof UnaryExpressionOperator) {
+                UnaryExpressionOperator orig = (UnaryExpressionOperator)op;
+                UnaryExpressionOperator cloneOp = (UnaryExpressionOperator)matches.get(op);
+                cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
             } else if (op instanceof BinaryExpressionOperator) {
                 BinaryExpressionOperator orig = (BinaryExpressionOperator)op;
                 BinaryExpressionOperator cloneOp = (BinaryExpressionOperator)matches.get(op);
@@ -304,11 +309,11 @@ public class PhysicalPlan extends Operat
 
         return clone;
     }
-    
+
     public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
         this.opmap = opmap;
     }
-    
+
     public void resetOpMap()
     {
         opmap = null;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Fri May 30 19:07:23 2014
@@ -64,7 +64,7 @@ public class PlanPrinter<O extends Opera
     String TABMore = "|   ";
 
     String LSep = "|\n|---";
-    
+
     String USep = "|   |\n|   ";
 
     int levelCntr = -1;
@@ -76,7 +76,7 @@ public class PlanPrinter<O extends Opera
     public PlanPrinter(P plan) {
         super(plan, new DepthFirstWalker<O, P>(plan));
     }
-    
+
     public PlanPrinter(P plan, PrintStream stream) {
         super(plan, new DepthFirstWalker<O, P>(plan));
         this.stream = stream;
@@ -142,7 +142,7 @@ public class PlanPrinter<O extends Opera
         sb.delete(sb.length() - "\n".length(), sb.length());
         return sb.toString();
     }
-    
+
     private String planString(PhysicalPlan pp){
         StringBuilder sb = new StringBuilder();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -154,7 +154,7 @@ public class PlanPrinter<O extends Opera
         sb.append(shiftStringByTabs(baos.toString(), 2));
         return sb.toString();
     }
-    
+
     private String planString(List<PhysicalPlan> lep){
         StringBuilder sb = new StringBuilder();
         if(lep!=null)
@@ -214,13 +214,13 @@ public class PlanPrinter<O extends Opera
               POSkewedJoin skewed = (POSkewedJoin)node;
               MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
               if(joinPlans!=null) {
-            	  List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
-            	  inner_plans.addAll(joinPlans.values());                
-                  sb.append(planString(inner_plans));                
+                  List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
+                  inner_plans.addAll(joinPlans.values());
+                  sb.append(planString(inner_plans));
               }
             }
         }
-        
+
         if (node instanceof POSplit) {
             sb.append(planString(((POSplit)node).getPlans()));
         }
@@ -231,13 +231,13 @@ public class PlanPrinter<O extends Opera
             plans.addAll(pl);
             sb.append(planString(plans));
         }
-        
+
         List<O> originalPredecessors = mPlan.getPredecessors(node);
         if (originalPredecessors == null)
             return sb.toString();
-        
+
         List<O> predecessors =  new ArrayList<O>(originalPredecessors);
-        
+
         Collections.sort(predecessors);
         int i = 0;
         for (O pred : predecessors) {
@@ -301,5 +301,5 @@ public class PlanPrinter<O extends Opera
     public void visitStartMap(POUnion op) {
         stream.print(op.name() + "   ");
     }
-    
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Fri May 30 19:07:23 2014
@@ -57,7 +57,7 @@ import org.w3c.dom.Node;
 
 
 public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
-PhyPlanVisitor {
+        PhyPlanVisitor {
 
     private Document doc = null;
     private Element parent = null;
@@ -236,4 +236,4 @@ PhyPlanVisitor {
         }
         return pkgrNode;
     }
-}
\ No newline at end of file
+}