You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [5/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/bac...
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Fri May 30 19:07:23 2014
@@ -18,15 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly.Map;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.data.Tuple;
@@ -47,16 +42,18 @@ public class PigMapReduceCounter {
/**
* Here is set up the task id, in order to be attached to each tuple
**/
+ @Override
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+ int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+ taskID = String.valueOf(taskIDInt);
pOperator = mp.getLeaves().get(0);
while(true) {
if(pOperator instanceof POCounter){
- ((POCounter) pOperator).setTaskId(taskID);
+ ((POCounter) pOperator).setTaskId(taskIDInt);
((POCounter) pOperator).resetLocalCounter();
break;
} else {
@@ -103,13 +100,14 @@ public class PigMapReduceCounter {
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- taskID = String.valueOf(context.getTaskAttemptID().getTaskID().getId());
+ int taskIDInt = context.getTaskAttemptID().getTaskID().getId();
+ taskID = String.valueOf(taskIDInt);
leaf = rp.getLeaves().get(0);
while(true) {
if(leaf instanceof POCounter){
- ((POCounter) leaf).setTaskId(taskID);
+ ((POCounter) leaf).setTaskId(taskIDInt);
((POCounter) leaf).resetLocalCounter();
break;
} else {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Fri May 30 19:07:23 2014
@@ -34,11 +34,6 @@ public class PigSecondaryKeyComparator e
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
Class<? extends TupleRawComparator> mComparatorClass = TupleFactory.getInstance().tupleRawComparatorClass();
mComparator = mComparatorClass.newInstance();
@@ -47,7 +42,7 @@ public class PigSecondaryKeyComparator e
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
- mComparator.setConf(jconf);
+ mComparator.setConf(conf);
}
protected PigSecondaryKeyComparator() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri May 30 19:07:23 2014
@@ -44,14 +44,8 @@ public class PigTextRawComparator extend
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
String msg = "Unable to deserialize pig.sortOrder";
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Fri May 30 19:07:23 2014
@@ -47,13 +47,8 @@ public class PigTupleDefaultRawComparato
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
@@ -75,7 +70,7 @@ public class PigTupleDefaultRawComparato
public boolean hasComparedTupleNull() {
return mHasNullField;
}
-
+
private static final BinInterSedes bis = new BinInterSedes();
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Fri May 30 19:07:23 2014
@@ -47,13 +47,8 @@ public class PigTupleSortComparator exte
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
@@ -86,7 +81,7 @@ public class PigTupleSortComparator exte
throw new RuntimeException(e);
}
}
- ((Configurable)mComparator).setConf(jconf);
+ ((Configurable)mComparator).setConf(conf);
}
@Override
@@ -165,4 +160,4 @@ public class PigTupleSortComparator exte
return 0;
}
}
-}
\ No newline at end of file
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java Fri May 30 19:07:23 2014
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerMR extends MROpPlanVisitor implements SecondaryKeyOptimizer {
+ private static Log log = LogFactory.getLog(SecondaryKeyOptimizerMR.class);
+ private SecondaryKeyOptimizerInfo info;
+
+ /**
+ * @param plan
+ * The MROperPlan to visit to discover keyType
+ */
+ public SecondaryKeyOptimizerMR(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // Only optimize for Cogroup case
+ if (mr.isGlobalSort())
+ return;
+
+ // Don't optimize when we already have a custom partitioner
+ if (mr.getCustomPartitioner()!=null)
+ return;
+
+ info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan);
+ if (info != null && info.isUseSecondaryKey()) {
+ mr.setUseSecondaryKey(true);
+ mr.setSecondarySortOrder(info.getSecondarySortOrder());
+ log.info("Using Secondary Key Optimization for MapReduce node " + mr.getOperatorKey());
+ }
+ }
+
+
+ @Override
+ public int getNumSortRemoved() {
+ return (info == null) ? 0 : info.getNumSortRemoved();
+ }
+
+ @Override
+ public int getNumDistinctChanged() {
+ return (info == null) ? 0 : info.getNumDistinctChanged();
+ }
+
+ @Override
+ public int getNumUseSecondaryKey() {
+ return (info == null) ? 0 : info.getNumUseSecondaryKey();
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri May 30 19:07:23 2014
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
@@ -36,31 +35,41 @@ import org.apache.pig.impl.io.NullableTu
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.Pair;
+import com.google.common.collect.Maps;
+
/**
* This class is used by skewed join. For the partitioned table, the skewedpartitioner reads the key
* distribution data from the sampler file and returns the reducer index in a round robin fashion.
- * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
+ * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
* in a round robin manner.
- */
+ */
public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
- Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
- static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
- Integer totalReducers;
- Configuration conf;
+ protected static final TupleFactory tf = TupleFactory.getInstance();
+
+ protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+ protected Integer totalReducers = -1;
+ protected boolean inited = false;
+
+ private Map<Tuple, Integer> currentIndexMap = Maps.newHashMap();
+ private Configuration conf;
@Override
- public int getPartition(PigNullableWritable wrappedKey, Writable value,
- int numPartitions) {
- // for streaming tables, return the partition index blindly
- if (wrappedKey instanceof NullablePartitionWritable && (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
- return ((NullablePartitionWritable)wrappedKey).getPartition();
- }
+ public int getPartition(PigNullableWritable wrappedKey, Writable value, int numPartitions) {
+ if (!inited) {
+ init();
+ }
+
+ // for streaming tables, return the partition index blindly
+ if (wrappedKey instanceof NullablePartitionWritable &&
+ (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+ return ((NullablePartitionWritable)wrappedKey).getPartition();
+ }
// for partition table, compute the index based on the sampler output
Pair <Integer, Integer> indexes;
Integer curIndex = -1;
- Tuple keyTuple = TupleFactory.getInstance().newTuple(1);
+ Tuple keyTuple = tf.newTuple(1);
// extract the key from nullablepartitionwritable
PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
@@ -78,7 +87,7 @@ public class SkewedPartitioner extends P
// if the partition file is empty, use numPartitions
totalReducers = (totalReducers > 0) ? totalReducers : numPartitions;
-
+
indexes = reducerMap.get(keyTuple);
// if the reducerMap does not contain the key, do the default hash based partitioning
if (indexes == null) {
@@ -105,24 +114,29 @@ public class SkewedPartitioner extends P
conf = job;
PigMapReduce.sJobConfInternal.set(conf);
PigMapReduce.sJobConf = conf;
- String keyDistFile = job.get("pig.keyDistFile", "");
- if (keyDistFile.length() == 0)
- throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ protected void init() {
+ String keyDistFile = conf.get("pig.keyDistFile", "");
+ if (keyDistFile.length() == 0) {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " used but no key distribution found");
+ }
try {
Integer [] redCnt = new Integer[1];
reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
- keyDistFile, redCnt, DataType.TUPLE, job);
+ keyDistFile, redCnt, DataType.TUPLE, conf);
// check if the partition file is empty
totalReducers = (redCnt[0] == null) ? -1 : redCnt[0];
} catch (Exception e) {
throw new RuntimeException(e);
}
+ inited = true;
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri May 30 19:07:23 2014
@@ -24,8 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
@@ -58,20 +56,23 @@ import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
- PigNullableWritable[] quantiles;
- RawComparator<PigNullableWritable> comparator;
- PigContext pigContext;
- final public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts
- = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
- private static final Log log = LogFactory.getLog(WeightedRangePartitioner.class);
+ protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts =
+ new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+ protected PigNullableWritable[] quantiles;
+ private RawComparator<PigNullableWritable> comparator;
+ private PigContext pigContext;
+ private Configuration job;
- Configuration job;
+ protected boolean inited = false;
@SuppressWarnings("unchecked")
@Override
public int getPartition(PigNullableWritable key, Writable value,
int numPartitions){
+ if (!inited) {
+ init();
+ }
if (comparator == null) {
comparator = (RawComparator<PigNullableWritable>)PigMapReduce.sJobContext.getSortComparator();
}
@@ -89,19 +90,15 @@ public class WeightedRangePartitioner ex
}
@SuppressWarnings("unchecked")
- @Override
- public void setConf(Configuration configuration) {
- job = configuration;
-
+ public void init() {
+ weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
try {
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
- } catch (IOException e1) {
- // should not happen
- e1.printStackTrace();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deserialize pig context: ", e);
}
- String quantilesFile = configuration.get("pig.quantilesFile", "");
-
+ String quantilesFile = job.get("pig.quantilesFile", "");
if (quantilesFile.length() == 0) {
throw new RuntimeException(this.getClass().getSimpleName()
+ " used but no quantiles found");
@@ -109,36 +106,38 @@ public class WeightedRangePartitioner ex
try{
// use local file system to get the quantilesFile
+ Map<String, Object> quantileMap = null;
Configuration conf;
if (!pigContext.getExecType().isLocal()) {
conf = new Configuration(true);
} else {
conf = new Configuration(false);
}
- if (configuration.get("fs.file.impl") != null) {
- conf.set("fs.file.impl", configuration.get("fs.file.impl"));
+ if (job.get("fs.file.impl") != null) {
+ conf.set("fs.file.impl", job.get("fs.file.impl"));
}
- if (configuration.get("fs.hdfs.impl") != null) {
- conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
+ if (job.get("fs.hdfs.impl") != null) {
+ conf.set("fs.hdfs.impl", job.get("fs.hdfs.impl"));
}
- MapRedUtil.copyTmpFileConfigurationValues(configuration, conf);
-
+ MapRedUtil.copyTmpFileConfigurationValues(job, conf);
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
conf, quantilesFile, 0);
- DataBag quantilesList;
Tuple t = loader.getNext();
if (t != null) {
// the Quantiles file has a tuple as under:
// (numQuantiles, bag of samples)
// numQuantiles here is the reduce parallelism
- Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
- quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ quantileMap = (Map<String, Object>) t.get(0);
+ }
+
+ if (quantileMap!=null) {
+ DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
- for(Entry<Object, Object> ent : weightedPartsData.entrySet()){
+ for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple)ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple)ent.getValue());
weightedParts.put(getPigNullableWritable(key),
@@ -151,9 +150,15 @@ public class WeightedRangePartitioner ex
// called. If the quantiles file is empty due to either a bug or
// a transient failure situation on the dfs, then weightedParts will
// not be populated and the job will fail in getPartition()
- }catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException(e);
}
+ inited = true;
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ job = configuration;
}
/**
@@ -161,7 +166,7 @@ public class WeightedRangePartitioner ex
* @return
* @throws ExecException
*/
- private float[] getProbVec(Tuple values) throws ExecException {
+ protected float[] getProbVec(Tuple values) throws ExecException {
float[] probVec = new float[values.size()];
for(int i = 0; i < values.size(); i++) {
probVec[i] = (Float)values.get(i);
@@ -169,7 +174,7 @@ public class WeightedRangePartitioner ex
return probVec;
}
- private PigNullableWritable getPigNullableWritable(Tuple t) {
+ protected PigNullableWritable getPigNullableWritable(Tuple t) {
try {
// user comparators work with tuples - so if user comparator
// is being used OR if there are more than 1 sort cols, use
@@ -191,9 +196,9 @@ public class WeightedRangePartitioner ex
}
}
- private void convertToArray(
- DataBag quantilesListAsBag) {
+ protected void convertToArray(DataBag quantilesListAsBag) {
ArrayList<PigNullableWritable> quantilesList = getList(quantilesListAsBag);
+
if ("true".equals(job.get("pig.usercomparator")) ||
quantilesList.get(0).getClass().equals(NullableTuple.class)) {
quantiles = quantilesList.toArray(new NullableTuple[0]);
@@ -227,7 +232,6 @@ public class WeightedRangePartitioner ex
* @return
*/
private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag) {
-
ArrayList<PigNullableWritable> list = new ArrayList<PigNullableWritable>();
for (Tuple tuple : quantilesListAsBag) {
list.add(getPigNullableWritable(tuple));
@@ -240,5 +244,4 @@ public class WeightedRangePartitioner ex
return job;
}
-
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri May 30 19:07:23 2014
@@ -24,6 +24,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -32,9 +34,9 @@ import org.apache.pig.impl.plan.VisitorE
/**
* This visitor visits the MRPlan and does the following
* for each MROper: If the map plan or the reduce plan of the MROper has
- * an end of all input flag present in it, this marks in the MROper whether the map
+ * an end of all input flag present in it, this marks in the MROper whether the map
* has an end of all input flag set or if the reduce has an end of all input flag set.
- *
+ *
*/
public class EndOfAllInputSetter extends MROpPlanVisitor {
@@ -47,43 +49,40 @@ public class EndOfAllInputSetter extends
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
-
+
EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
- mr.setEndOfAllInputInMap(true);
+ mr.setEndOfAllInputInMap(true);
}
-
+
checker = new EndOfAllInputChecker(mr.reducePlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
- mr.setEndOfAllInputInReduce(true);
- }
-
+ mr.setEndOfAllInputInReduce(true);
+ }
+
}
- static class EndOfAllInputChecker extends PhyPlanVisitor {
-
+ public static class EndOfAllInputChecker extends PhyPlanVisitor {
+
private boolean endOfAllInputFlag = false;
public EndOfAllInputChecker(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitStream(POStream stream) throws VisitorException {
// stream present
endOfAllInputFlag = true;
}
-
+
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
// merge join present
endOfAllInputFlag = true;
}
-
+
@Override
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
// map side group present
@@ -97,7 +96,17 @@ public class EndOfAllInputSetter extends
}
@Override
- public void visitPartialAgg(POPartialAgg partAgg){
+ public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
+ endOfAllInputFlag = true;
+ }
+
+ @Override
+ public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+ endOfAllInputFlag = true;
+ }
+
+ @Override
+ public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
}
@@ -109,4 +118,3 @@ public class EndOfAllInputSetter extends
}
}
}
-
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri May 30 19:07:23 2014
@@ -56,7 +56,7 @@ public class POPackageAnnotator extends
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
-
+
// POPackage OR POJoinPackage could be present in the combine plan
// OR in the reduce plan. POPostCombinerPackage could
// be present only in the reduce plan. Search in these two
@@ -67,9 +67,9 @@ public class POPackageAnnotator extends
POPackage pkg = pkgDiscoverer.getPkg();
if(pkg != null) {
handlePackage(mr, pkg);
- }
+ }
}
-
+
if(!mr.reducePlan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
pkgDiscoverer.visit();
@@ -89,14 +89,14 @@ public class POPackageAnnotator extends
}
}
}
-
+
}
-
+
private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
// the LocalRearrange(s) could either be in the map of this MapReduceOper
// OR in the reduce of predecessor MapReduceOpers
int lrFound = 0;
-
+
lrFound = patchPackage(mr.mapPlan, pkg);
if(lrFound != pkg.getNumInps()) {
// we did not find the LocalRearrange(s) in the map plan
@@ -109,7 +109,7 @@ public class POPackageAnnotator extends
lrFound += patchPackage(mrOper.reducePlan, pkg);
if(lrFound == pkg.getNumInps()) {
break;
- }
+ }
}
}
if(lrFound != pkg.getNumInps()) {
@@ -126,7 +126,7 @@ public class POPackageAnnotator extends
// the package
return lrDiscoverer.getLoRearrangeFound();
}
-
+
/**
* Simple visitor of the "Reduce" physical plan
* which will get a reference to the POPacakge
@@ -135,14 +135,11 @@ public class POPackageAnnotator extends
static class PackageDiscoverer extends PhyPlanVisitor {
private POPackage pkg;
-
+
public PackageDiscoverer(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
@@ -154,9 +151,9 @@ public class POPackageAnnotator extends
public POPackage getPkg() {
return pkg;
}
-
+
}
-
+
/**
* Physical Plan visitor which tries to get the
* LocalRearrange(s) present in the plan (if any) and
@@ -165,18 +162,15 @@ public class POPackageAnnotator extends
* present in the "key")
*/
static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-
+
private int loRearrangeFound = 0;
private POPackage pkg;
-
+
public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.pkg = pkg;
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
loRearrangeFound++;
@@ -194,18 +188,18 @@ public class POPackageAnnotator extends
keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-
+
if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
- // something is wrong - we should not be getting key info
+ // something is wrong - we should not be getting key info
// for the same index from two different Local Rearranges
int errCode = 2087;
String msg = "Unexpected problem during optimization." +
- " Found index:" + lrearrange.getIndex() +
+ " Found index:" + lrearrange.getIndex() +
" in multiple LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
-
+
}
- keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+ keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
pkg.getPkgr().setKeyInfo(keyInfo);
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.optimizer;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Remove POSort and change PODistinct to use POSortedDistinct in nested foreach plan by sorting on a secondary key
+ */
+public interface SecondaryKeyOptimizer {
+
+ int getNumSortRemoved();
+
+ int getNumDistinctChanged();
+
+ int getNumUseSecondaryKey();
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri May 30 19:07:23 2014
@@ -65,6 +65,8 @@ public abstract class PhysicalOperator e
private static final Log log = LogFactory.getLog(PhysicalOperator.class);
protected static final long serialVersionUID = 1L;
+ protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+ protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
// The degree of parallelism requested
protected int requestedParallelism;
@@ -98,7 +100,7 @@ public abstract class PhysicalOperator e
// Will be used by operators to report status or transmit heartbeat
// Should be set by the backends to appropriate implementations that
// wrap their own version of a reporter.
- private static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+ public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
// Will be used by operators to aggregate warning messages
// Should be set by the backends to appropriate implementations that
@@ -137,6 +139,21 @@ public abstract class PhysicalOperator e
res = new Result();
}
+ public PhysicalOperator(PhysicalOperator copy) {
+ super (copy.getOperatorKey());
+ this.res = new Result();
+ this.requestedParallelism = copy.requestedParallelism;
+ this.inputs = copy.inputs;
+ this.outputs = copy.outputs;
+ this.resultType = copy.resultType;
+ this.parentPlan = copy.parentPlan;
+ this.inputAttached = copy.inputAttached;
+ this.alias = copy.alias;
+ this.lineageTracer = copy.lineageTracer;
+ this.accum = copy.accum;
+ this.originalLocations = copy.originalLocations;
+ }
+
@Override
public void setIllustrator(Illustrator illustrator) {
this.illustrator = illustrator;
@@ -166,6 +183,10 @@ public abstract class PhysicalOperator e
return (alias == null) ? "" : (alias + ": ");
}
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
this.alias = alias;
this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
@@ -265,11 +286,9 @@ public abstract class PhysicalOperator e
*/
public Result processInput() throws ExecException {
try {
- Result res = new Result();
if (input == null && (inputs == null || inputs.size() == 0)) {
// log.warn("No inputs found. Signaling End of Processing.");
- res.returnStatus = POStatus.STATUS_EOP;
- return res;
+ return new Result(POStatus.STATUS_EOP, null);
}
// Should be removed once the model is clear
@@ -280,6 +299,7 @@ public abstract class PhysicalOperator e
if (!isInputAttached()) {
return inputs.get(0).getNextTuple();
} else {
+ Result res = new Result();
res.result = input;
res.returnStatus = POStatus.STATUS_OK;
detachInput();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Fri May 30 19:07:23 2014
@@ -21,7 +21,7 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
/**
- *
+ *
* An interface to allow aggregation of messages
*/
@InterfaceAudience.Private
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri May 30 19:07:23 2014
@@ -550,6 +550,11 @@ public class POUserFunc extends Expressi
public FuncSpec getFuncSpec() {
return funcSpec;
}
+
+ public void setFuncSpec(FuncSpec funcSpec) {
+ this.funcSpec = funcSpec;
+ instantiateFunc(funcSpec);
+ }
public String[] getCacheFiles() {
return cacheFiles;
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java.orig Fri May 30 19:07:23 2014
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+
+import static org.apache.pig.PigConfiguration.TIME_UDFS;
+import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
+import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.TerminatingAccumulator;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class POUserFunc extends ExpressionOperator {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POUserFunc.class);
+ private static final TupleFactory tf = TupleFactory.getInstance();
+
+ private transient String counterGroup;
+ private transient EvalFunc func;
+ private transient String[] cacheFiles = null;
+
+ FuncSpec funcSpec;
+ FuncSpec origFSpec;
+ public static final byte INITIAL = 0;
+ public static final byte INTERMEDIATE = 1;
+ public static final byte FINAL = 2;
+ private boolean initialized = false;
+ private MonitoredUDFExecutor executor = null;
+
+ private PhysicalOperator referencedOperator = null;
+ private boolean isAccumulationDone;
+ private String signature;
+ private boolean haveCheckedIfTerminatingAccumulator;
+
+ private long numInvocations = 0L;
+ private long timingFrequency = 100L;
+ private boolean doTiming = false;
+
+ public PhysicalOperator getReferencedOperator() {
+ return referencedOperator;
+ }
+
+ public void setReferencedOperator(PhysicalOperator referencedOperator) {
+ this.referencedOperator = referencedOperator;
+ }
+
+ public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ this(k, rp, inp, null);
+ }
+
+ public POUserFunc(
+ OperatorKey k,
+ int rp,
+ List<PhysicalOperator> inp,
+ FuncSpec funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+ }
+
+ public POUserFunc(
+ OperatorKey k,
+ int rp,
+ List<PhysicalOperator> inp,
+ FuncSpec funcSpec,
+ EvalFunc func) {
+ super(k, rp);
+ super.setInputs(inp);
+ this.funcSpec = funcSpec;
+ this.origFSpec = funcSpec;
+ this.func = func;
+ instantiateFunc(funcSpec);
+ }
+
+ private void instantiateFunc(FuncSpec fSpec) {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+ this.setSignature(signature);
+ this.setFuncInputSchema(signature);
+ if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
+ executor = new MonitoredUDFExecutor(func);
+ }
+ //the next couple of initializations do not work as intended for the following reasons
+ //the reporter and pigLogger are member variables of PhysicalOperator
+ //when instanitateFunc is invoked at deserialization time, both
+ //reporter and pigLogger are null. They are set during map and reduce calls,
+ //making the initializations here basically useless. Look at the processInput
+ //method where these variables are re-initialized. At that point, the PhysicalOperator
+ //is set up correctly with the reporter and pigLogger references
+ this.func.setReporter(getReporter());
+ this.func.setPigLogger(pigLogger);
+ }
+
+ private transient TupleMaker inputTupleMaker;
+ private boolean usingSchemaTupleFactory;
+
+ @Override
+ public Result processInput() throws ExecException {
+
+ // Make sure the reporter is set, because it isn't getting carried
+ // across in the serialization (don't know why). I suspect it's as
+ // cheap to call the setReporter call everytime as to check whether I
+ // have (hopefully java will inline it).
+ if(!initialized) {
+ func.setReporter(getReporter());
+ func.setPigLogger(pigLogger);
+ Configuration jobConf = UDFContext.getUDFContext().getJobConf();
+ if (jobConf != null) {
+ doTiming = jobConf.getBoolean(TIME_UDFS, false);
+ if (doTiming) {
+ counterGroup = funcSpec.toString();
+ timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+ }
+ }
+ // We initialize here instead of instantiateFunc because this is called
+ // when actual processing has begun, whereas a function can be instantiated
+ // on the frontend potentially (mainly for optimization)
+ Schema tmpS = func.getInputSchema();
+ if (tmpS != null) {
+ //Currently, getInstanceForSchema returns null if no class was found. This works fine...
+ //if it is null, the default will be used. We pass the context because if it happens that
+ //the same Schema was generated elsewhere, we do not want to override user expectations
+ inputTupleMaker = SchemaTupleFactory.getInstance(tmpS, false, GenContext.UDF);
+ if (inputTupleMaker == null) {
+ LOG.debug("No SchemaTupleFactory found for Schema ["+tmpS+"], using default TupleFactory");
+ usingSchemaTupleFactory = false;
+ } else {
+ LOG.debug("Using SchemaTupleFactory for Schema: " + tmpS);
+ usingSchemaTupleFactory = true;
+ }
+
+ //In the future, we could optionally use SchemaTuples for output as well
+ }
+
+ if (inputTupleMaker == null) {
+ inputTupleMaker = TupleFactory.getInstance();
+ }
+
+ initialized = true;
+ }
+
+ Result res = new Result();
+ if (input == null && (inputs == null || inputs.size()==0)) {
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
+
+ //Should be removed once the model is clear
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
+
+
+ if(isInputAttached()) {
+ res.result = input;
+ res.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return res;
+ } else {
+ //we decouple this because there may be cases where the size is known and it isn't a schema
+ // tuple factory
+ boolean knownSize = usingSchemaTupleFactory;
+ int knownIndex = 0;
+ res.result = inputTupleMaker.newTuple();
+
+ Result temp = null;
+
+ for(PhysicalOperator op : inputs) {
+ temp = op.getNext(op.getResultType());
+ if(temp.returnStatus!=POStatus.STATUS_OK) {
+ return temp;
+ }
+
+ if(op instanceof POProject &&
+ op.getResultType() == DataType.TUPLE){
+ POProject projOp = (POProject)op;
+ if(projOp.isProjectToEnd()){
+ Tuple trslt = (Tuple) temp.result;
+ Tuple rslt = (Tuple) res.result;
+ for(int i=0;i<trslt.size();i++) {
+ if (knownSize) {
+ rslt.set(knownIndex++, trslt.get(i));
+ } else {
+ rslt.append(trslt.get(i));
+ }
+ }
+ continue;
+ }
+ }
+ if (knownSize) {
+ ((Tuple)res.result).set(knownIndex++, temp.result);
+ } else {
+ ((Tuple)res.result).append(temp.result);
+ }
+ }
+ res.returnStatus = temp.returnStatus;
+
+ return res;
+ }
+ }
+
+ private boolean isEarlyTerminating = false;
+
+ private void setIsEarlyTerminating() {
+ isEarlyTerminating = true;
+ }
+
+ private boolean isEarlyTerminating() {
+ return isEarlyTerminating;
+ }
+
+ private boolean isTerminated = false;
+
+ private boolean hasBeenTerminated() {
+ return isTerminated;
+ }
+
+ private void earlyTerminate() {
+ isTerminated = true;
+ }
+
+ private Result getNext() throws ExecException {
+ Result result = processInput();
+ long startNanos = 0;
+ boolean timeThis = doTiming && (numInvocations++ % timingFrequency == 0);
+ if (timeThis) {
+ startNanos = System.nanoTime();
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_INVOCATION_COUNTER, timingFrequency);
+ }
+ try {
+ if(result.returnStatus == POStatus.STATUS_OK) {
+ Tuple t = (Tuple) result.result;
+
+ // For backward compatibility, we short-circuit tuples whose
+ // size is 1 and field is null. (See PIG-3679)
+ if (t.size() == 1 && t.isNull(0)) {
+ pigLogger.warn(this, "All the input values are null, skipping the invocation of UDF",
+ PigWarning.SKIP_UDF_CALL_FOR_NULL);
+ Schema outputSchema = func.outputSchema(func.getInputSchema());
+ // If the output schema is tuple (i.e. multiple fields are
+ // to be returned), we return a tuple where every field is
+ // null.
+ if (outputSchema != null && outputSchema.getField(0).type == DataType.TUPLE) {
+ result.result = tf.newTuple(outputSchema.getField(0).schema.size());
+ // Otherwise, we simply return null since it can be cast to
+ // any data type.
+ } else {
+ result.result = null;
+ }
+ return result;
+ }
+
+ if (isAccumulative()) {
+ if (isAccumStarted()) {
+ if (!haveCheckedIfTerminatingAccumulator) {
+ haveCheckedIfTerminatingAccumulator = true;
+ if (func instanceof TerminatingAccumulator<?>)
+ setIsEarlyTerminating();
+ }
+
+ if (!hasBeenTerminated() && isEarlyTerminating() && ((TerminatingAccumulator<?>)func).isFinished()) {
+ earlyTerminate();
+ }
+
+ if (hasBeenTerminated()) {
+ result.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
+ result.result = null;
+ isAccumulationDone = false;
+ } else {
+ ((Accumulator)func).accumulate((Tuple)result.result);
+ result.returnStatus = POStatus.STATUS_BATCH_OK;
+ result.result = null;
+ isAccumulationDone = false;
+ }
+ }else{
+ if(isAccumulationDone){
+ //PORelationToExprProject does not return STATUS_EOP
+ // so that udf gets called both when isAccumStarted
+ // is first true and then set to false, even
+ //when the input relation is empty.
+ // so the STATUS_EOP has to be sent from POUserFunc,
+ // after the results have been sent.
+ result.result = null;
+ result.returnStatus = POStatus.STATUS_EOP;
+ }
+ else{
+ result.result = ((Accumulator)func).getValue();
+ result.returnStatus = POStatus.STATUS_OK;
+ ((Accumulator)func).cleanup();
+ isAccumulationDone = true;
+ }
+ }
+ } else {
+ if (executor != null) {
+ result.result = executor.monitorExec((Tuple) result.result);
+ } else {
+ result.result = func.exec((Tuple) result.result);
+ }
+ }
+ }
+ if (timeThis) {
+ PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
+ Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
+ }
+ return result;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (IOException ioe) {
+ int errCode = 2078;
+ String msg = "Caught error from UDF: " + funcSpec.getClassName();
+ String footer = " [" + ioe.getMessage() + "]";
+
+ if(ioe instanceof PigException) {
+ int udfErrorCode = ((PigException)ioe).getErrorCode();
+ if(udfErrorCode != 0) {
+ errCode = udfErrorCode;
+ msg = ((PigException)ioe).getMessage();
+ } else {
+ msg += " [" + ((PigException)ioe).getMessage() + " ]";
+ }
+ } else {
+ msg += footer;
+ }
+
+ throw new ExecException(msg, errCode, PigException.BUG, ioe);
+ } catch (IndexOutOfBoundsException ie) {
+ int errCode = 2078;
+ String msg = "Caught error from UDF: " + funcSpec.getClassName() +
+ ", Out of bounds access [" + ie.getMessage() + "]";
+ throw new ExecException(msg, errCode, PigException.BUG, ie);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextDataBag() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextInteger() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextBoolean() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextDataByteArray() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextDouble() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextBigInteger() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextBigDecimal() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextFloat() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextLong() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextDateTime() throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNextMap() throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNextString() throws ExecException {
+ return getNext();
+ }
+
+ public void setAlgebraicFunction(byte Function) throws ExecException {
+ // This will only be used by the optimizer for putting correct functions
+ // in the mapper,
+ // combiner and reduce. This helps in maintaining the physical plan as
+ // is without the
+ // optimiser having to replace any operators.
+ // You wouldn't be able to make two calls to this function on the same
+ // algebraic EvalFunc as
+ // func is being changed.
+ switch (Function) {
+ case INITIAL:
+ funcSpec = new FuncSpec(getInitial());
+ break;
+ case INTERMEDIATE:
+ funcSpec = new FuncSpec(getIntermed());
+ break;
+ case FINAL:
+ funcSpec = new FuncSpec(getFinal());
+ break;
+ }
+ funcSpec.setCtorArgs(origFSpec.getCtorArgs());
+ instantiateFunc(funcSpec);
+ setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
+ }
+
+ public String getInitial() throws ExecException {
+ instantiateFunc(origFSpec);
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getInitial();
+ } else {
+ int errCode = 2072;
+ String msg = "Attempt to run a non-algebraic function"
+ + " as an algebraic function";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ public String getIntermed() throws ExecException {
+ instantiateFunc(origFSpec);
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getIntermed();
+ } else {
+ int errCode = 2072;
+ String msg = "Attempt to run a non-algebraic function"
+ + " as an algebraic function";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ public String getFinal() throws ExecException {
+ instantiateFunc(origFSpec);
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getFinal();
+ } else {
+ int errCode = 2072;
+ String msg = "Attempt to run a non-algebraic function"
+ + " as an algebraic function";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ public Type getReturnType() {
+ return func.getReturnType();
+ }
+
+ public void finish() {
+ func.finish();
+ if (executor != null) {
+ executor.terminate();
+ }
+ }
+
+ public Schema outputSchema(Schema input) {
+ return func.outputSchema(input);
+ }
+
+ public Boolean isAsynchronous() {
+ return func.isAsynchronous();
+ }
+
+ @Override
+ public String name() {
+ return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+
+ v.visitUserFunc(this);
+ }
+
+ public FuncSpec getFuncSpec() {
+ return funcSpec;
+ }
+
+ public String[] getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public void setCacheFiles(String[] cf) {
+ cacheFiles = cf;
+ }
+
+ public boolean combinable() {
+ return (func instanceof Algebraic);
+ }
+
+ @Override
+ public POUserFunc clone() throws CloneNotSupportedException {
+ // Inputs will be patched up later by PhysicalPlan.clone()
+ POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+ requestedParallelism, null, funcSpec.clone());
+ clone.setResultType(resultType);
+ clone.signature = signature;
+ return clone;
+ }
+
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
+ is.defaultReadObject();
+ instantiateFunc(funcSpec);
+ }
+
+ /**
+ * Get child expression of this expression
+ */
+ @Override
+ public List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setAccumStart() {
+ if (isAccumulative() && !isAccumStarted()) {
+ super.setAccumStart();
+ ((Accumulator)func).cleanup();
+ }
+ }
+
+ @Override
+ public void setResultType(byte resultType) {
+ this.resultType = resultType;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
+
+ public EvalFunc getFunc() {
+ return func;
+ }
+
+ public void setSignature(String signature) {
+ this.signature = signature;
+ if (this.func!=null) {
+ this.func.setUDFContextSignature(signature);
+ }
+ }
+
+ /**
+ * Sets EvalFunc's inputschema based on the signature
+ * @param signature
+ */
+ public void setFuncInputSchema(String signature) {
+ Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+ Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+ if(tmpS!=null) {
+ this.func.setInputSchema(tmpS);
+ }
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri May 30 19:07:23 2014
@@ -63,8 +63,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -77,10 +79,10 @@ import org.apache.pig.impl.plan.VisitorE
/**
* The visitor class for the Physical Plan. To use this,
- * create the visitor with the plan to be visited. Call
+ * create the visitor with the plan to be visited. Call
* the visit() method to traverse the plan in a depth first
* fashion.
- *
+ *
* This class also visits the nested plans inside the operators.
* One has to extend this class to modify the nature of each visit
* and to maintain any relevant state information between the visits
@@ -93,25 +95,28 @@ public class PhyPlanVisitor extends Plan
super(plan, walker);
}
+ public void visit(PhysicalOperator op) {
+ // do nothing
+ }
+
public void visitLoad(POLoad ld) throws VisitorException{
//do nothing
}
-
+
public void visitStore(POStore st) throws VisitorException{
//do nothing
}
-
+
public void visitNative(PONative nat) throws VisitorException{
//do nothing
}
-
+
public void visitFilter(POFilter fl) throws VisitorException{
pushWalker(mCurrentWalker.spawnChildWalker(fl.getPlan()));
visit();
popWalker();
}
-
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
List<PhysicalPlan> inpPlans = mg.getPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -120,7 +125,7 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
}
-
+
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
List<PhysicalPlan> inpPlans = lr.getPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -133,11 +138,11 @@ public class PhyPlanVisitor extends Plan
public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
//do nothing
}
-
+
public void visitPackage(POPackage pkg) throws VisitorException{
//do nothing
}
-
+
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<PhysicalPlan> inpPlans = nfe.getInputPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -146,11 +151,11 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
}
-
+
public void visitUnion(POUnion un) throws VisitorException{
//do nothing
}
-
+
public void visitSplit(POSplit spl) throws VisitorException{
List<PhysicalPlan> plans = spl.getPlans();
for (PhysicalPlan plan : plans) {
@@ -177,78 +182,78 @@ public class PhyPlanVisitor extends Plan
//do nothing
}
- public void visitDistinct(PODistinct distinct) throws VisitorException {
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
//do nothing
- }
+ }
- public void visitSort(POSort sort) throws VisitorException {
+ public void visitSort(POSort sort) throws VisitorException {
List<PhysicalPlan> inpPlans = sort.getSortPlans();
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
popWalker();
}
- }
-
+ }
+
public void visitConstant(ConstantExpression cnst) throws VisitorException{
//do nothing
}
-
+
public void visitProject(POProject proj) throws VisitorException{
//do nothing
}
-
+
public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
//do nothing
}
-
+
public void visitLessThan(LessThanExpr lt) throws VisitorException{
//do nothing
}
-
+
public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
//do nothing
}
-
+
public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
//do nothing
}
-
+
public void visitEqualTo(EqualToExpr eq) throws VisitorException{
//do nothing
}
-
+
public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
//do nothing
}
-
+
public void visitRegexp(PORegexp re) throws VisitorException{
//do nothing
}
public void visitIsNull(POIsNull isNull) throws VisitorException {
}
-
+
public void visitAdd(Add add) throws VisitorException{
//do nothing
}
-
+
public void visitSubtract(Subtract sub) throws VisitorException {
//do nothing
}
-
+
public void visitMultiply(Multiply mul) throws VisitorException {
//do nothing
}
-
+
public void visitDivide(Divide dv) throws VisitorException {
//do nothing
}
-
+
public void visitMod(Mod mod) throws VisitorException {
//do nothing
}
-
+
public void visitAnd(POAnd and) throws VisitorException {
//do nothing
}
@@ -263,91 +268,94 @@ public class PhyPlanVisitor extends Plan
public void visitBinCond(POBinCond binCond) {
// do nothing
-
}
public void visitNegative(PONegative negative) {
//do nothing
-
}
-
+
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
//do nothing
}
-
+
public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
//do nothing
}
public void visitMapLookUp(POMapLookUp mapLookUp) {
// TODO Auto-generated method stub
-
}
public void visitCast(POCast cast) {
// TODO Auto-generated method stub
-
}
-
+
public void visitLimit(POLimit lim) throws VisitorException{
- //do nothing
+ PhysicalPlan inpPlan = lim.getLimitPlan();
+ if (inpPlan!=null) {
+ pushWalker(mCurrentWalker.spawnChildWalker(inpPlan));
+ visit();
+ popWalker();
+ }
}
-
+
public void visitCross(POCross cross) throws VisitorException{
//do nothing
}
-
+
public void visitFRJoin(POFRJoin join) throws VisitorException {
//do nothing
}
-
+
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
//do nothing
}
-
+
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
-
+
}
/**
* @param stream
- * @throws VisitorException
+ * @throws VisitorException
*/
public void visitStream(POStream stream) throws VisitorException {
// TODO Auto-generated method stub
-
}
- public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+ public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
- }
+ }
- public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+ public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
List<PhysicalPlan> inpPlans = pr.getPlans();
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
popWalker();
}
- }
+ }
/**
* @param optimizedForEach
*/
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
// TODO Auto-generated method stub
-
}
/**
* @param preCombinerLocalRearrange
*/
public void visitPreCombinerLocalRearrange(
- POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
// TODO Auto-generated method stub
}
- public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
}
+ public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+ }
+ public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri May 30 19:07:23 2014
@@ -29,9 +29,10 @@ import java.util.Map;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryComparisonOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -39,18 +40,18 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.MultiMap;
/**
- *
- * The base class for all types of physical plans.
+ *
+ * The base class for all types of physical plans.
* This extends the Operator Plan.
*
*/
public class PhysicalPlan extends OperatorPlan<PhysicalOperator> implements Cloneable {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
-
+
// marker to indicate whether all input for this plan
// has been sent - this is currently used in POStream
// to know if all map() calls and reduce() calls are finished
@@ -58,18 +59,18 @@ public class PhysicalPlan extends Operat
public boolean endOfAllInput = false;
private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
-
+
public PhysicalPlan() {
super();
}
-
+
public void attachInput(Tuple t){
List<PhysicalOperator> roots = getRoots();
for (PhysicalOperator operator : roots) {
operator.attachInput(t);
}
}
-
+
public void detachInput(){
for(PhysicalOperator op : getRoots())
op.detachInput();
@@ -117,7 +118,7 @@ public class PhysicalPlan extends Operat
ps.println("<physicalPlan>XML Not Supported</physicalPlan>");
return;
}
-
+
ps.println("#-----------------------------------------------");
ps.println("# Physical Plan:");
ps.println("#-----------------------------------------------");
@@ -132,22 +133,22 @@ public class PhysicalPlan extends Operat
}
ps.println("");
}
-
+
@Override
public void connect(PhysicalOperator from, PhysicalOperator to)
throws PlanException {
-
+
super.connect(from, to);
to.setInputs(getPredecessors(to));
}
-
+
/*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
if(!to.supportsMultipleInputs()){
throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
}
-
+
}*/
-
+
@Override
public void remove(PhysicalOperator op) {
op.setInputs(null);
@@ -169,7 +170,7 @@ public class PhysicalPlan extends Operat
}
super.remove(op);
}
-
+
/* (non-Javadoc)
* @see org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
*/
@@ -187,10 +188,10 @@ public class PhysicalPlan extends Operat
if(inputs.get(i) == oldNode) {
inputs.set(i, newNode);
}
- }
+ }
}
}
-
+
}
/* (non-Javadoc)
@@ -226,7 +227,7 @@ public class PhysicalPlan extends Operat
// clones, create a map between clone and original. Then walk the
// connections in this plan and create equivalent connections in the
// clone.
- Map<PhysicalOperator, PhysicalOperator> matches =
+ Map<PhysicalOperator, PhysicalOperator> matches =
new HashMap<PhysicalOperator, PhysicalOperator>(mOps.size());
for (PhysicalOperator op : mOps.keySet()) {
PhysicalOperator c = op.clone();
@@ -264,7 +265,7 @@ public class PhysicalPlan extends Operat
for (PhysicalOperator op : mOps.keySet()) {
List<PhysicalOperator> inputs = op.getInputs();
if (inputs == null || inputs.size() == 0) continue;
- List<PhysicalOperator> newInputs =
+ List<PhysicalOperator> newInputs =
new ArrayList<PhysicalOperator>(inputs.size());
PhysicalOperator cloneOp = matches.get(op);
if (cloneOp == null) {
@@ -281,13 +282,17 @@ public class PhysicalPlan extends Operat
}
cloneOp.setInputs(newInputs);
}
-
+
for (PhysicalOperator op : mOps.keySet()) {
- if (op instanceof UnaryComparisonOperator) {
- UnaryComparisonOperator orig = (UnaryComparisonOperator)op;
- UnaryComparisonOperator cloneOp = (UnaryComparisonOperator)matches.get(op);
- cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
+ if (op instanceof ComparisonOperator) {
+ ComparisonOperator orig = (ComparisonOperator)op;
+ ComparisonOperator cloneOp = (ComparisonOperator)matches.get(op);
cloneOp.setOperandType(orig.getOperandType());
+ }
+ if (op instanceof UnaryExpressionOperator) {
+ UnaryExpressionOperator orig = (UnaryExpressionOperator)op;
+ UnaryExpressionOperator cloneOp = (UnaryExpressionOperator)matches.get(op);
+ cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
} else if (op instanceof BinaryExpressionOperator) {
BinaryExpressionOperator orig = (BinaryExpressionOperator)op;
BinaryExpressionOperator cloneOp = (BinaryExpressionOperator)matches.get(op);
@@ -304,11 +309,11 @@ public class PhysicalPlan extends Operat
return clone;
}
-
+
public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
this.opmap = opmap;
}
-
+
public void resetOpMap()
{
opmap = null;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Fri May 30 19:07:23 2014
@@ -64,7 +64,7 @@ public class PlanPrinter<O extends Opera
String TABMore = "| ";
String LSep = "|\n|---";
-
+
String USep = "| |\n| ";
int levelCntr = -1;
@@ -76,7 +76,7 @@ public class PlanPrinter<O extends Opera
public PlanPrinter(P plan) {
super(plan, new DepthFirstWalker<O, P>(plan));
}
-
+
public PlanPrinter(P plan, PrintStream stream) {
super(plan, new DepthFirstWalker<O, P>(plan));
this.stream = stream;
@@ -142,7 +142,7 @@ public class PlanPrinter<O extends Opera
sb.delete(sb.length() - "\n".length(), sb.length());
return sb.toString();
}
-
+
private String planString(PhysicalPlan pp){
StringBuilder sb = new StringBuilder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -154,7 +154,7 @@ public class PlanPrinter<O extends Opera
sb.append(shiftStringByTabs(baos.toString(), 2));
return sb.toString();
}
-
+
private String planString(List<PhysicalPlan> lep){
StringBuilder sb = new StringBuilder();
if(lep!=null)
@@ -214,13 +214,13 @@ public class PlanPrinter<O extends Opera
POSkewedJoin skewed = (POSkewedJoin)node;
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
if(joinPlans!=null) {
- List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
- inner_plans.addAll(joinPlans.values());
- sb.append(planString(inner_plans));
+ List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
+ inner_plans.addAll(joinPlans.values());
+ sb.append(planString(inner_plans));
}
}
}
-
+
if (node instanceof POSplit) {
sb.append(planString(((POSplit)node).getPlans()));
}
@@ -231,13 +231,13 @@ public class PlanPrinter<O extends Opera
plans.addAll(pl);
sb.append(planString(plans));
}
-
+
List<O> originalPredecessors = mPlan.getPredecessors(node);
if (originalPredecessors == null)
return sb.toString();
-
+
List<O> predecessors = new ArrayList<O>(originalPredecessors);
-
+
Collections.sort(predecessors);
int i = 0;
for (O pred : predecessors) {
@@ -301,5 +301,5 @@ public class PlanPrinter<O extends Opera
public void visitStartMap(POUnion op) {
stream.print(op.name() + " ");
}
-
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Fri May 30 19:07:23 2014
@@ -57,7 +57,7 @@ import org.w3c.dom.Node;
public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
-PhyPlanVisitor {
+ PhyPlanVisitor {
private Document doc = null;
private Element parent = null;
@@ -236,4 +236,4 @@ PhyPlanVisitor {
}
return pkgrNode;
}
-}
\ No newline at end of file
+}