You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC
svn commit: r1546314 [5/6] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802
+ * for more details.
+ */
+public class LitePackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ public boolean[] getInner() {
+ return null;
+ }
+
+ public void setInner(boolean[] inner) {
+ }
+
+ /**
+ * Make a deep copy of this operator.
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ public LitePackager clone() throws CloneNotSupportedException {
+ LitePackager clone = (LitePackager) super.clone();
+ clone.inner = null;
+ if (keyInfo != null) {
+ clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ for (Entry<Integer, Pair<Boolean, Map<Integer, Integer>>> entry : keyInfo
+ .entrySet()) {
+ clone.keyInfo.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return clone;
+ }
+
+ /**
+ * @return the distinct
+ */
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ /**
+ * @param distinct the distinct to set
+ */
+ @Override
+ public void setDistinct(boolean distinct) {
+ }
+
+ /**
+ * @return the isKeyTuple
+ */
+ public boolean getKeyTuple() {
+ return isKeyTuple;
+ }
+
+ /**
+ * @return the keyAsTuple
+ */
+ public Tuple getKeyAsTuple() {
+ return isKeyTuple ? (Tuple) key : null;
+ }
+
+ /**
+ * @return the key
+ */
+ public Object getKey() {
+ return key;
+ }
+
+ /**
+ * Similar to POPackage.getNext except that
+ * only one input is expected with index 0
+ * and ReadOnceBag is used instead of
+ * DefaultDataBag.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+ Tuple res;
+
+ //Construct the output tuple by appending
+ //the key and all the above constructed bags
+ //and return it.
+ res = mTupleFactory.newTuple(numInputs+1);
+ res.set(0,key);
+ res.set(1, bags[0]);
+ detachInput();
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_OK;
+ r.result = res;
+ return r;
+ }
+
+ /**
+ * Makes use of the superclass method, but this requires
+ * an additional parameter key passed by ReadOnceBag.
+ * key of this instance will be set to null in detachInput
+ * call, but an instance of ReadOnceBag may have the original
+ * key that it uses. Therefore this extra argument is taken
+ * to temporarily set it before the call to the superclass method
+ * and then restore it.
+ */
+ @Override
+ public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+ throws ExecException {
+ Object origKey = this.key;
+ this.key = key;
+ Tuple retTuple = super.getValueTuple(key, ntup, index);
+ this.key = origKey;
+ return retTuple;
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * The package operator that packages the globally rearranged tuples
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator
+ * receives an input, it extracts the index from the key and calls the
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which
+ * knows how to consume the output of this operator.
+ */
+public class MultiQueryPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private static int idxPart = 0x7F;
+
+ private List<Packager> packagers = new ArrayList<Packager>();
+
+ /**
+ * If the POLocalRearranges corresponding to the reduce plans in
+ * myPlans (the list of inner plans of the demux) have different key types
+ * then the MultiQueryOptimizer converts all the keys to be of type tuple
+ * by wrapping any non-tuple keys into Tuples (keys which are already tuples
+ * are left alone).
+ * The list below is a list of booleans indicating whether extra tuple wrapping
+ * was done for the key in the corresponding POLocalRearranges and if we need
+ * to "unwrap" the tuple to get to the key
+ */
+ private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+ /*
+ * Indicating if all the inner plans have the same
+ * map key type. If not, the keys passed in are
+ * wrapped inside tuples and need to be extracted
+ * out during the reduce phase
+ */
+ private boolean sameMapKeyType = true;
+
+ /*
+ * Indicating if this operator is in a combiner.
+ * If not, this operator is in a reducer and the key
+ * values must first be extracted from the tuple-wrap
+ * before writing out to the disk
+ */
+ private boolean inCombiner = false;
+
+ transient private PigNullableWritable myKey;
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ */
+ public void addPackager(Packager pkgr) {
+ packagers.add(pkgr);
+ }
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ * @param mapKeyType the map key type associated with the package
+ */
+ public void addPackager(Packager pkgr, byte mapKeyType) {
+ packagers.add(pkgr);
+ // if mapKeyType is already a tuple, we will NOT
+ // be wrapping it in an extra tuple. If it is not
+ // a tuple, we will wrap into in a tuple
+ isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+ }
+
+ /**
+ * Returns the list of packages.
+ *
+ * @return the list of the packages
+ */
+ public List<Packager> getPackagers() {
+ return packagers;
+ }
+
+ /**
+ * Constructs the output tuple from the inputs.
+ * <p>
+ * The output is consumed by for the demultiplexer operator
+ * (PODemux) in the format (key, {bag of tuples}) where key
+ * is an indexed WritableComparable, not the wrapped value as a pig type.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+
+ byte origIndex = myKey.getIndex();
+
+ int index = (int)origIndex;
+ index &= idxPart;
+
+ if (index >= packagers.size() || index < 0) {
+ int errCode = 2140;
+ String msg = "Invalid package index " + index
+ + " should be in the range between 0 and "
+ + packagers.size();
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ Packager pkgr = packagers.get(index);
+
+ // check to see if we need to unwrap the key. The keys may be
+ // wrapped inside a tuple by LocalRearrange operator when jobs
+ // with different map key types are merged
+ PigNullableWritable curKey = myKey;
+ if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+ Tuple tup = (Tuple)myKey.getValueAsPigType();
+ curKey = HDataType.getWritableComparableTypes(tup.get(0),
+ pkgr.getKeyType());
+ curKey.setIndex(origIndex);
+ }
+
+ pkgr.attachInput(curKey, bags, readOnce);
+
+ Result res = pkgr.getNext();
+ pkgr.detachInput();
+
+ Tuple tuple = (Tuple)res.result;
+
+ // the object present in the first field
+ // of the tuple above is the real data without
+ // index information - this is because the
+ // package above, extracts the real data out of
+ // the PigNullableWritable object - we are going to
+ // give this result tuple to a PODemux operator
+ // which needs a PigNullableWritable first field so
+ // it can figure out the index. Therefore we need
+ // to add index to the first field of the tuple.
+
+ Object obj = tuple.get(0);
+ if (obj instanceof PigNullableWritable) {
+ ((PigNullableWritable)obj).setIndex(origIndex);
+ }
+ else {
+ PigNullableWritable myObj = null;
+ if (obj == null) {
+ myObj = new NullableUnknownWritable();
+ myObj.setNull(true);
+ }
+ else {
+ myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey));
+ }
+ myObj.setIndex(origIndex);
+ tuple.set(0, myObj);
+ }
+ // illustrator markup has been handled by "pack"
+ return res;
+ }
+
+ /**
+ * Returns the list of booleans that indicates if the
+ * key needs to unwrapped for the corresponding plan.
+ *
+ * @return the list of isKeyWrapped boolean values
+ */
+ public List<Boolean> getIsKeyWrappedList() {
+ return Collections.unmodifiableList(isKeyWrapped);
+ }
+
+ /**
+ * Adds a list of IsKeyWrapped boolean values
+ *
+ * @param lst the list of boolean values to add
+ */
+ public void addIsKeyWrappedList(List<Boolean> lst) {
+ for (Boolean b : lst) {
+ isKeyWrapped.add(b);
+ }
+ }
+
+ public void setInCombiner(boolean inCombiner) {
+ this.inCombiner = inCombiner;
+ }
+
+ public boolean isInCombiner() {
+ return inCombiner;
+ }
+
+ public void setSameMapKeyType(boolean sameMapKeyType) {
+ this.sameMapKeyType = sameMapKeyType;
+ }
+
+ public boolean isSameMapKeyType() {
+ return sameMapKeyType;
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 28 08:56:33 2013
@@ -26,22 +26,21 @@ import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
@@ -62,13 +61,8 @@ import org.apache.pig.pen.util.LineageTr
* bags based on the index.
*/
public class POPackage extends PhysicalOperator {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public static enum PackageType { GROUP, JOIN };
+ private static final long serialVersionUID = 1L;
//The iterator of indexed Tuples
//that is typically provided by
@@ -78,32 +72,11 @@ public class POPackage extends PhysicalO
//The key being worked on
Object key;
- // marker to indicate if key is a tuple
- protected boolean isKeyTuple = false;
- // marker to indicate if the tuple key is compound in nature
- protected boolean isKeyCompound = false;
- // key as a Tuple object (if the key is a tuple)
- protected Tuple keyAsTuple;
-
- //key's type
- byte keyType;
-
//The number of inputs to this
//co-group. 0 indicates a distinct, which means there will only be a
//key, no value.
int numInputs;
- // If the attaching map-reduce plan use secondary sort key
- boolean useSecondaryKey = false;
-
- //Denotes if inner is specified
- //on a particular input
- boolean[] inner;
-
- // flag to denote whether there is a distinct
- // leading to this package
- protected boolean distinct = false;
-
// A mapping of input index to key information got from LORearrange
// for that index. The Key information is a pair of boolean, Map.
// The boolean indicates whether there is a lone project(*) in the
@@ -119,7 +92,9 @@ public class POPackage extends PhysicalO
private boolean useDefaultBag = false;
- private PackageType pkgType;
+ protected Packager pkgr;
+
+ private boolean[] readOnce;
public POPackage(OperatorKey k) {
this(k, -1, null);
@@ -134,16 +109,22 @@ public class POPackage extends PhysicalO
}
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ this(k, rp, inp, new Packager());
+ }
+
+ public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
+ Packager pkgr) {
super(k, rp, inp);
numInputs = -1;
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+ this.pkgr = pkgr;
}
@Override
public String name() {
- return getAliasString() + "Package" + "["
+ return getAliasString() + "Package" + "(" + pkgr.name() + ")" + "["
+ DataType.findTypeName(resultType) + "]" + "{"
- + DataType.findTypeName(keyType) + "}" + " - "
+ + DataType.findTypeName(pkgr.getKeyType()) + "}" + " - "
+ mKey.toString();
}
@@ -171,16 +152,8 @@ public class POPackage extends PhysicalO
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
try {
tupIter = inp;
- key = k.getValueAsPigType();
- if (useSecondaryKey) {
- key = ((Tuple)key).get(0);
-
- }
- if(isKeyTuple) {
- // key is a tuple, cache the key as a
- // tuple for use in the getNext()
- keyAsTuple = (Tuple)key;
- }
+ key = pkgr.getKey(k.getValueAsPigType());
+ inputAttached = true;
} catch (Exception e) {
throw new RuntimeException(
"Error attaching input for key " + k +
@@ -194,6 +167,7 @@ public class POPackage extends PhysicalO
public void detachInput() {
tupIter = null;
key = null;
+ inputAttached = false;
}
public int getNumInps() {
@@ -202,14 +176,10 @@ public class POPackage extends PhysicalO
public void setNumInps(int numInps) {
this.numInputs = numInps;
- }
-
- public boolean[] getInner() {
- return inner;
- }
-
- public void setInner(boolean[] inner) {
- this.inner = inner;
+ pkgr.setNumInputs(numInps);
+ readOnce = new boolean[numInputs];
+ for (int i = 0; i < numInputs; i++)
+ readOnce[i] = false;
}
/**
@@ -219,25 +189,18 @@ public class POPackage extends PhysicalO
*/
@Override
public Result getNextTuple() throws ExecException {
- Tuple res;
-
if(firstTime){
firstTime = false;
if (PigMapReduce.sJobConfInternal.get() != null) {
- String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ String bagType = PigMapReduce.sJobConfInternal.get().get(
+ "pig.cachedbag.type");
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
}
}
-
- if(distinct) {
- // only set the key which has the whole
- // tuple
- res = mTupleFactory.newTuple(1);
- res.set(0, key);
- } else {
- //Create numInputs bags
+ if (isInputAttached()) {
+ // Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
@@ -253,20 +216,24 @@ public class POPackage extends PhysicalO
} else {
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
- dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+ dbs[i] = useDefaultBag ? BagFactory.getInstance()
+ .newDefaultBag()
// In a very rare case if there is a POStream after this
- // POPackage in the pipeline and is also blocking the pipeline;
- // constructor argument should be 2 * numInputs. But for one obscure
+ // POPackage in the pipeline and is also blocking the
+ // pipeline;
+ // constructor argument should be 2 * numInputs. But for one
+ // obscure
// case we don't want to pay the penalty all the time.
: new InternalCachedBag(numInputs);
}
- //For each indexed tup in the inp, sort them
- //into their corresponding bags based
- //on the index
+ // For each indexed tup in the inp, sort them
+ // into their corresponding bags based
+ // on the index
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
+ Tuple copy = pkgr.getValueTuple(key,
+ ntup, index);
if (numInputs == 1) {
@@ -278,109 +245,30 @@ public class POPackage extends PhysicalO
} else {
dbs[index].add(copy);
}
- if(getReporter()!=null) {
+ if (getReporter() != null) {
getReporter().progress();
}
}
}
-
- //Construct the output tuple by appending
- //the key and all the above constructed bags
- //and return it.
- res = mTupleFactory.newTuple(numInputs+1);
- res.set(0,key);
- int i=-1;
- for (DataBag bag : dbs) {
- i++;
- if(inner[i] && !isAccumulative()){
- if(bag.size()==0){
- detachInput();
- Result r = new Result();
- r.returnStatus = POStatus.STATUS_NULL;
- return r;
- }
- }
-
- res.set(i+1,bag);
- }
+ // Construct the output tuple by appending
+ // the key and all the above constructed bags
+ // and return it.
+ pkgr.attachInput(key, dbs, readOnce);
+ detachInput();
}
- Result r = new Result();
- r.returnStatus = POStatus.STATUS_OK;
- if (!isAccumulative())
- r.result = illustratorMarkup(null, res, 0);
- else
- r.result = res;
- detachInput();
- return r;
- }
- protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
- // Need to make a copy of the value, as hadoop uses the same ntup
- // to represent each value.
- Tuple val = (Tuple)ntup.getValueAsPigType();
-
- Tuple copy = null;
- // The "value (val)" that we just got may not
- // be the complete "value". It may have some portions
- // in the "key" (look in POLocalRearrange for more comments)
- // If this is the case we need to stitch
- // the "value" together.
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(index);
- boolean isProjectStar = lrKeyInfo.first;
- Map<Integer, Integer> keyLookup = lrKeyInfo.second;
- int keyLookupSize = keyLookup.size();
-
- if( keyLookupSize > 0) {
-
- // we have some fields of the "value" in the
- // "key".
- int finalValueSize = keyLookupSize + val.size();
- copy = mTupleFactory.newTuple(finalValueSize);
- int valIndex = 0; // an index for accessing elements from
- // the value (val) that we have currently
- for(int i = 0; i < finalValueSize; i++) {
- Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
- // the field for this index is not in the
- // key - so just take it from the "value"
- // we were handed
- copy.set(i, val.get(valIndex));
- valIndex++;
- } else {
- // the field for this index is in the key
- if(isKeyTuple && isKeyCompound) {
- // the key is a tuple, extract the
- // field out of the tuple
- copy.set(i, keyAsTuple.get(keyIndex));
- } else {
- copy.set(i, key);
- }
- }
- }
- copy = illustratorMarkup2(val, copy);
- } else if (isProjectStar) {
-
- // the whole "value" is present in the "key"
- copy = mTupleFactory.newTuple(keyAsTuple.getAll());
- copy = illustratorMarkup2(keyAsTuple, copy);
- } else {
-
- // there is no field of the "value" in the
- // "key" - so just make a copy of what we got
- // as the "value"
- copy = mTupleFactory.newTuple(val.getAll());
- copy = illustratorMarkup2(val, copy);
- }
- return copy;
+ Result r = pkgr.getNext();
+ Tuple packedTup = (Tuple) r.result;
+ packedTup = illustratorMarkup(null, packedTup, 0);
+ return r;
}
- public byte getKeyType() {
- return keyType;
+ public Packager getPkgr() {
+ return pkgr;
}
- public void setKeyType(byte keyType) {
- this.keyType = keyType;
+ public void setPkgr(Packager pkgr) {
+ this.pkgr = pkgr;
}
/**
@@ -393,74 +281,11 @@ public class POPackage extends PhysicalO
clone.mKey = new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
clone.requestedParallelism = requestedParallelism;
clone.resultType = resultType;
- clone.keyType = keyType;
clone.numInputs = numInputs;
- if (inner!=null)
- {
- clone.inner = new boolean[inner.length];
- for (int i = 0; i < inner.length; i++) {
- clone.inner[i] = inner[i];
- }
- }
- else
- clone.inner = null;
+ clone.pkgr = (Packager) this.pkgr.clone();
return clone;
}
- /**
- * @param keyInfo the keyInfo to set
- */
- public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
- this.keyInfo = keyInfo;
- }
-
- /**
- * @param keyTuple the keyTuple to set
- */
- public void setKeyTuple(boolean keyTuple) {
- this.isKeyTuple = keyTuple;
- }
-
- /**
- * @param keyCompound the keyCompound to set
- */
- public void setKeyCompound(boolean keyCompound) {
- this.isKeyCompound = keyCompound;
- }
-
- /**
- * @return the keyInfo
- */
- public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
- return keyInfo;
- }
-
- /**
- * @return the distinct
- */
- public boolean isDistinct() {
- return distinct;
- }
-
- /**
- * @param distinct the distinct to set
- */
- public void setDistinct(boolean distinct) {
- this.distinct = distinct;
- }
-
- public void setUseSecondaryKey(boolean useSecondaryKey) {
- this.useSecondaryKey = useSecondaryKey;
- }
-
- public void setPackageType(PackageType type) {
- this.pkgType = type;
- }
-
- public PackageType getPackageType() {
- return pkgType;
- }
-
class POPackageTupleBuffer implements AccumulativeTupleBuffer {
private List<Tuple>[] bags;
private Iterator<NullableTuple> iter;
@@ -499,18 +324,17 @@ public class POPackage extends PhysicalO
key = currKey;
for(int i=0; i<batchSize; i++) {
if (iter.hasNext()) {
- NullableTuple ntup = iter.next();
- int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
- if (numInputs == 1) {
-
- // this is for multi-query merge where
- // the numInputs is always 1, but the index
- // (the position of the inner plan in the
- // enclosed operator) may not be 1.
- bags[0].add(copy);
+ NullableTuple ntup = iter.next();
+ int index = ntup.getIndex();
+ Tuple copy = pkgr.getValueTuple(key, ntup, index);
+ if (numInputs == 1) {
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ bags[0].add(copy);
} else {
- bags[index].add(copy);
+ bags[index].add(copy);
}
}else{
break;
@@ -532,76 +356,76 @@ public class POPackage extends PhysicalO
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
}
- };
+ };
+
+ public Tuple illustratorMarkup2(Object in, Object out) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.getLineage().insert(tOut);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ illustrator.getLineage().union(tOut, (Tuple) in);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
- private Tuple illustratorMarkup2(Object in, Object out) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- illustrator.getLineage().insert(tOut);
- tOut.synthetic = ((ExampleTuple) in).synthetic;
- illustrator.getLineage().union(tOut, (Tuple) in);
- return tOut;
- } else
- return (Tuple) out;
- }
-
- @Override
- public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert(tOut);
- Tuple tmp;
- boolean synthetic = false;
- if (illustrator.getEquivalenceClasses() == null) {
- LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
- for (int i = 0; i < numInputs; ++i) {
- IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
- equivalenceClasses.add(equivalenceClass);
- }
- illustrator.setEquivalenceClasses(equivalenceClasses, this);
- }
-
- if (distinct) {
- int count;
- for (count = 0; tupIter.hasNext(); ++count) {
- NullableTuple ntp = tupIter.next();
- tmp = (Tuple) ntp.getValueAsPigType();
- if (!tmp.equals(tOut))
- lineageTracer.union(tOut, tmp);
- }
- if (count > 1) // only non-distinct tuples are inserted into the equivalence class
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- illustrator.addData((Tuple) tOut);
- return (Tuple) tOut;
- }
- boolean outInEqClass = true;
- try {
- for (int i = 1; i < numInputs+1; i++)
- {
- DataBag dbs = (DataBag) ((Tuple) out).get(i);
- Iterator<Tuple> iter = dbs.iterator();
- if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
- outInEqClass = false;
- while (iter.hasNext()) {
- tmp = iter.next();
- // any of synthetic data in bags causes the output tuple to be synthetic
- if (!synthetic && ((ExampleTuple)tmp).synthetic)
- synthetic = true;
- lineageTracer.union(tOut, tmp);
- }
- }
- } catch (ExecException e) {
- // TODO better exception handling
- throw new RuntimeException("Illustrator exception :"+e.getMessage());
- }
- if (outInEqClass)
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- tOut.synthetic = synthetic;
- illustrator.addData((Tuple) tOut);
- return tOut;
- } else
- return (Tuple) out;
- }
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ Tuple tmp;
+ boolean synthetic = false;
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, this);
+ }
+
+ if (pkgr.isDistinct()) {
+ int count;
+ for (count = 0; tupIter.hasNext(); ++count) {
+ NullableTuple ntp = tupIter.next();
+ tmp = (Tuple) ntp.getValueAsPigType();
+ if (!tmp.equals(tOut))
+ lineageTracer.union(tOut, tmp);
+ }
+ if (count > 1) // only non-distinct tuples are inserted into the equivalence class
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ illustrator.addData((Tuple) tOut);
+ return (Tuple) tOut;
+ }
+ boolean outInEqClass = true;
+ try {
+ for (int i = 1; i < numInputs + 1; i++) {
+ DataBag dbs = (DataBag) ((Tuple) out).get(i);
+ Iterator<Tuple> iter = dbs.iterator();
+ if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
+ outInEqClass = false;
+ while (iter.hasNext()) {
+ tmp = iter.next();
+ // any of synthetic data in bags causes the output tuple to be synthetic
+ if (!synthetic && ((ExampleTuple) tmp).synthetic)
+ synthetic = true;
+ lineageTracer.union(tOut, tmp);
+ }
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"
+ + e.getMessage());
+ }
+ if (outInEqClass)
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,293 @@
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+public class Packager implements Serializable, Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected boolean[] readOnce;
+
+ protected DataBag[] bags;
+
+ public static enum PackageType {
+ GROUP, JOIN
+ };
+
+ // The key being worked on
+ Object key;
+
+ // marker to indicate if key is a tuple
+ protected boolean isKeyTuple = false;
+ // marker to indicate if the tuple key is compound in nature
+ protected boolean isKeyCompound = false;
+
+ // key's type
+ byte keyType;
+
+ // The number of inputs to this
+ // co-group. 0 indicates a distinct, which means there will only be a
+ // key, no value.
+ int numInputs;
+
+ // If the attaching map-reduce plan use secondary sort key
+ boolean useSecondaryKey = false;
+
+ // Denotes if inner is specified
+ // on a particular input
+ boolean[] inner;
+
+ // flag to denote whether there is a distinct
+ // leading to this package
+ protected boolean distinct = false;
+
+ // A mapping of input index to key information got from LORearrange
+ // for that index. The Key information is a pair of boolean, Map.
+ // The boolean indicates whether there is a lone project(*) in the
+ // cogroup by. If not, the Map has a mapping of column numbers in the
+ // "value" to column numbers in the "key" which contain the fields in
+ // the "value"
+ protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+ private PackageType pkgType;
+
+ protected static final BagFactory mBagFactory = BagFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory
+ .getInstance();
+
+ Object getKey(Object key) throws ExecException {
+ if (useSecondaryKey) {
+ return ((Tuple) key).get(0);
+ } else {
+ return key;
+ }
+ }
+
+ void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ }
+
+ public Result getNext() throws ExecException {
+ Tuple res;
+
+ if (isDistinct()) {
+ // only set the key which has the whole
+ // tuple
+ res = mTupleFactory.newTuple(1);
+ res.set(0, key);
+ } else {
+ // Construct the output tuple by appending
+ // the key and all the above constructed bags
+ // and return it.
+ res = mTupleFactory.newTuple(numInputs + 1);
+ res.set(0, key);
+ int i = -1;
+ for (DataBag bag : bags) {
+ i++;
+ if (inner[i]) {
+ if (bag.size() == 0) {
+ detachInput();
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_NULL;
+ return r;
+ }
+ }
+
+ if (!readOnce[i]) {
+ res.set(i + 1, bag);
+ } else {
+ DataBag readBag = mBagFactory.newDefaultBag();
+ readBag.addAll(bag);
+ res.set(i + 1, readBag);
+ }
+ }
+ }
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_OK;
+ // if (!isAccumulative())
+ // r.result = illustratorMarkup(null, res, 0);
+ // else
+ r.result = res;
+ return r;
+ }
+
+ void detachInput() {
+ key = null;
+ bags = null;
+ }
+
+ protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+ throws ExecException {
+ // Need to make a copy of the value, as hadoop uses the same ntup
+ // to represent each value.
+ Tuple val = (Tuple) ntup.getValueAsPigType();
+
+ Tuple copy = null;
+ // The "value (val)" that we just got may not
+ // be the complete "value". It may have some portions
+ // in the "key" (look in POLocalRearrange for more comments)
+ // If this is the case we need to stitch
+ // the "value" together.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = keyInfo.get(index);
+ boolean isProjectStar = lrKeyInfo.first;
+ Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+ int keyLookupSize = keyLookup.size();
+
+ if (keyLookupSize > 0) {
+
+ // we have some fields of the "value" in the
+ // "key".
+ int finalValueSize = keyLookupSize + val.size();
+ copy = mTupleFactory.newTuple(finalValueSize);
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for (int i = 0; i < finalValueSize; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if (keyIndex == null) {
+ // the field for this index is not in the
+ // key - so just take it from the "value"
+ // we were handed
+ copy.set(i, val.get(valIndex));
+ valIndex++;
+ } else {
+ // the field for this index is in the key
+ if (isKeyTuple && isKeyCompound) {
+ // the key is a tuple, extract the
+ // field out of the tuple
+ copy.set(i, ((Tuple) key).get(keyIndex));
+ } else {
+ copy.set(i, key);
+ }
+ }
+ }
+ // copy = illustratorMarkup2(val, copy);
+ } else if (isProjectStar) {
+
+ // the whole "value" is present in the "key"
+ copy = mTupleFactory.newTuple(((Tuple) key).getAll());
+ // copy = illustratorMarkup2((Tuple) key, copy);
+ } else {
+
+ // there is no field of the "value" in the
+ // "key" - so just make a copy of what we got
+ // as the "value"
+ copy = mTupleFactory.newTuple(val.getAll());
+ // copy = illustratorMarkup2(val, copy);
+ }
+ return copy;
+ }
+
+ public byte getKeyType() {
+ return keyType;
+ }
+
+ public void setKeyType(byte keyType) {
+ this.keyType = keyType;
+ }
+
+ public boolean[] getInner() {
+ return inner;
+ }
+
+ public void setInner(boolean[] inner) {
+ this.inner = inner;
+ }
+
+ /**
+ * @param keyInfo the keyInfo to set
+ */
+ public void setKeyInfo(
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+ this.keyInfo = keyInfo;
+ }
+
+ /**
+ * @param keyTuple the keyTuple to set
+ */
+ public void setKeyTuple(boolean keyTuple) {
+ this.isKeyTuple = keyTuple;
+ }
+
+ /**
+ * @param keyCompound the keyCompound to set
+ */
+ public void setKeyCompound(boolean keyCompound) {
+ this.isKeyCompound = keyCompound;
+ }
+
+ /**
+ * @return the keyInfo
+ */
+ public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+ return keyInfo;
+ }
+
+ /**
+ * @return the distinct
+ */
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ /**
+ * @param distinct the distinct to set
+ */
+ public void setDistinct(boolean distinct) {
+ this.distinct = distinct;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public void setPackageType(PackageType type) {
+ this.pkgType = type;
+ }
+
+ public PackageType getPackageType() {
+ return pkgType;
+ }
+
+ public int getNumInputs() {
+ return numInputs;
+ }
+
+ public void setNumInputs(int numInputs) {
+ this.numInputs = numInputs;
+ }
+
+ public Packager clone() throws CloneNotSupportedException {
+ Packager clone = (Packager) super.clone();
+ clone.setNumInputs(numInputs);
+ clone.setPackageType(pkgType);
+ clone.setDistinct(distinct);
+ clone.setInner(inner);
+ if (keyInfo != null)
+ clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
+ keyInfo));
+ clone.setKeyCompound(isKeyCompound);
+ clone.setKeyTuple(isKeyTuple);
+ clone.setKeyType(keyType);
+ clone.setUseSecondaryKey(useSecondaryKey);
+ return clone;
+ }
+
+ public String name() {
+ return this.getClass().getSimpleName();
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Nov 28 08:56:33 2013
@@ -51,7 +51,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
@@ -59,13 +58,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -209,20 +206,6 @@ public class PlanHelper {
}
@Override
- public void visitCombinerPackage(POCombinerPackage pkg)
- throws VisitorException {
- super.visitCombinerPackage(pkg);
- visit(pkg);
- }
-
- @Override
- public void visitMultiQueryPackage(POMultiQueryPackage pkg)
- throws VisitorException {
- super.visitMultiQueryPackage(pkg);
- visit(pkg);
- }
-
- @Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
super.visitPOForEach(nfe);
visit(nfe);
@@ -400,13 +383,6 @@ public class PlanHelper {
}
@Override
- public void visitJoinPackage(POJoinPackage joinPackage)
- throws VisitorException {
- super.visitJoinPackage(joinPackage);
- visit(joinPackage);
- }
-
- @Override
public void visitCast(POCast cast) {
super.visitCast(cast);
visit(cast);
@@ -449,7 +425,6 @@ public class PlanHelper {
visit(stream);
}
- @Override
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
super.visitSkewedJoin(sk);
visit(sk);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Nov 28 08:56:33 2013
@@ -39,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -54,8 +55,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -63,6 +62,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -160,7 +160,7 @@ public class TezCompiler extends PhyPlan
public TezOperPlan getTezPlan() {
return tezPlan;
}
-
+
// Segment a single DAG into a DAG graph
public TezPlanContainer getPlanContainer() throws PlanException {
TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
@@ -345,7 +345,7 @@ public class TezCompiler extends PhyPlan
tezOp.plan.addAsLeaf(op);
curTezOp = tezOp;
}
-
+
private void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
plan.connect(from, to);
// Add edge descriptors to old and new operators
@@ -504,7 +504,7 @@ public class TezCompiler extends PhyPlan
blocking();
POPackage pkg = getPackage();
- pkg.setDistinct(true);
+ pkg.getPkgr().setDistinct(true);
curTezOp.plan.add(pkg);
POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
@@ -659,9 +659,9 @@ public class TezCompiler extends PhyPlan
try{
nonBlocking(op);
phyToTezOpMap.put(op, curTezOp);
- if (op.getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN) {
curTezOp.markRegularJoin();
- } else if (op.getPackageType() == PackageType.GROUP) {
+ } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
curTezOp.markGroupBy();
} else if (op.getNumInps() > 1) {
@@ -716,7 +716,7 @@ public class TezCompiler extends PhyPlan
return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
-
+
private POStore getStore(){
POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
// mark store as tmp store. These could be removed by the
@@ -724,7 +724,7 @@ public class TezCompiler extends PhyPlan
st.setIsTmpStore(true);
return st;
}
-
+
/**
* Force an end to the current vertex with a store into a temporary
* file.
@@ -750,19 +750,19 @@ public class TezCompiler extends PhyPlan
}
return oper;
}
-
+
private Pair<TezOperator[],Integer> getQuantileJobs(
POSort inpSort,
TezOperator prevJob,
FileSpec lFile,
FileSpec quantFile,
int rp) throws PlanException, VisitorException {
-
+
POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
.getRequestedParallelism(), null, inpSort.getSortPlans(),
inpSort.getMAscCols(), inpSort.getMSortFunc());
sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
-
+
// Turn the asc/desc array into an array of strings so that we can pass it
// to the FindQuantiles function.
List<Boolean> ascCols = inpSort.getMAscCols();
@@ -780,10 +780,10 @@ public class TezCompiler extends PhyPlan
ctorArgs[j+1] = ascs[j];
}
}
-
+
return getSamplingJobs(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
}
-
+
/**
* Create a sampling job to collect statistics by sampling an input file. The sequence of operations is as
* following:
@@ -794,7 +794,7 @@ public class TezCompiler extends PhyPlan
* <li>Sorting the bag </li>
* <li>Invoke UDF with the number of reducers and the sorted bag.</li>
* <li>Data generated by UDF is stored into a file.</li>
- *
+ *
* @param sort the POSort operator used to sort the bag
* @param prevJob previous job of current sampling job
* @param transformPlans PhysicalPlans to transform input samples
@@ -809,44 +809,43 @@ public class TezCompiler extends PhyPlan
* @throws PlanException
* @throws VisitorException
*/
- @SuppressWarnings("deprecation")
private Pair<TezOperator[],Integer> getSamplingJobs(POSort sort, TezOperator prevJob, List<PhysicalPlan> transformPlans,
- FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
+ FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
-
+
String[] rslargs = new String[2];
- // SampleLoader expects string version of FuncSpec
+ // SampleLoader expects string version of FuncSpec
// as its first constructor argument.
-
+
rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
-
+
rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
new FuncSpec(sampleLdrClassName, rslargs));
-
+
TezOperator[] opers = new TezOperator[2];
-
+
TezOperator oper1 = startNew(quantLdFilName, prevJob);
opers[0] = oper1;
-
+
// TODO: Review sort udf
// if(sort.isUDFComparatorUsed) {
// mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
// curMROp.isUDFComparatorUsed = true;
-// }
-
- List<Boolean> flat1 = new ArrayList<Boolean>();
+// }
+
+ List<Boolean> flat1 = new ArrayList<Boolean>();
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-
+
// if transform plans are not specified, project the columns of sorting keys
- if (transformPlans == null) {
+ if (transformPlans == null) {
Pair<POProject, Byte>[] sortProjs = null;
try{
sortProjs = getSortCols(sort.getSortPlans());
}catch(Exception e) {
throw new RuntimeException(e);
}
- // Set up the projections of the key columns
+ // Set up the projections of the key columns
if (sortProjs == null) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
@@ -860,7 +859,7 @@ public class TezCompiler extends PhyPlan
} else {
for (Pair<POProject, Byte> sortProj : sortProjs) {
// Check for proj being null, null is used by getSortCols for a non POProject
- // operator. Since Order by does not allow expression operators,
+ // operator. Since Order by does not allow expression operators,
//it should never be set to null
if(sortProj == null){
int errCode = 2174;
@@ -889,10 +888,10 @@ public class TezCompiler extends PhyPlan
}
}
- // This foreach will pick the sort key columns from the RandomSampleLoader output
+ // This foreach will pick the sort key columns from the RandomSampleLoader output
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
oper1.plan.addAsLeaf(nfe1);
-
+
// Now set up a POLocalRearrange which has "all" as the key and the output of the
// foreach will be the "value" out of POLocalRearrange
PhysicalPlan ep1 = new PhysicalPlan();
@@ -900,10 +899,10 @@ public class TezCompiler extends PhyPlan
ce.setValue("all");
ce.setResultType(DataType.CHARARRAY);
ep1.add(ce);
-
+
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
eps.add(ep1);
-
+
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
@@ -918,46 +917,46 @@ public class TezCompiler extends PhyPlan
lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
oper1.plan.add(lr);
oper1.plan.connect(nfe1, lr);
-
+
oper1.setClosed(true);
-
+
TezOperator oper2 = getTezOp();
opers[1] = oper2;
tezPlan.add(oper2);
connect(tezPlan, oper1, oper2);
-
+
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.CHARARRAY);
+ pkg.getPkgr().setKeyType(DataType.CHARARRAY);
pkg.setNumInps(1);
- boolean[] inner = {false};
- pkg.setInner(inner);
+ boolean[] inner = {false};
+ pkg.getPkgr().setInner(inner);
oper2.plan.add(pkg);
-
+
// Lets start building the plan which will have the sort
// for the foreach
PhysicalPlan fe2Plan = new PhysicalPlan();
- // Top level project which just projects the tuple which is coming
+ // Top level project which just projects the tuple which is coming
// from the foreach after the package
POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
topPrj.setResultType(DataType.BAG);
topPrj.setOverloaded(true);
fe2Plan.add(topPrj);
-
+
// the projections which will form sort plans
- List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
+ List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
if (sortKeyPlans != null) {
- for(int i=0; i<sortKeyPlans.size(); i++) {
- nesSortPlanLst.add(sortKeyPlans.get(i));
+ for(int i=0; i<sortKeyPlans.size(); i++) {
+ nesSortPlanLst.add(sortKeyPlans.get(i));
}
- }else{
+ }else{
Pair<POProject, Byte>[] sortProjs = null;
try{
sortProjs = getSortCols(sort.getSortPlans());
}catch(Exception e) {
throw new RuntimeException(e);
}
- // Set up the projections of the key columns
+ // Set up the projections of the key columns
if (sortProjs == null) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
@@ -971,7 +970,7 @@ public class TezCompiler extends PhyPlan
for (int i=0; i<sortProjs.length; i++) {
POProject prj =
new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-
+
prj.setResultType(sortProjs[i].second);
if(sortProjs[i].first != null && sortProjs[i].first.isProjectToEnd()){
if(i != sortProjs.length -1){
@@ -991,14 +990,14 @@ public class TezCompiler extends PhyPlan
ep.add(prj);
nesSortPlanLst.add(ep);
}
- }
+ }
}
-
+
sort.setSortPlans(nesSortPlanLst);
sort.setResultType(DataType.BAG);
fe2Plan.add(sort);
fe2Plan.connect(topPrj, sort);
-
+
// The plan which will have a constant representing the
// degree of parallelism for the final order by map-reduce job
// this will either come from a "order by parallel x" in the script
@@ -1007,66 +1006,66 @@ public class TezCompiler extends PhyPlan
PhysicalPlan rpep = new PhysicalPlan();
ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
rpce.setRequestedParallelism(rp);
-
+
// We temporarily set it to rp and will adjust it at runtime, because the final degree of parallelism
// is unknown until we are ready to submit it. See PIG-2779.
rpce.setValue(rp);
-
+
rpce.setResultType(DataType.INTEGER);
rpep.add(rpce);
-
+
List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
genEps.add(rpep);
genEps.add(fe2Plan);
-
+
List<Boolean> flattened2 = new ArrayList<Boolean>();
flattened2.add(false);
flattened2.add(false);
-
+
POForEach nfe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1, genEps, flattened2);
oper2.plan.add(nfe2);
oper2.plan.connect(pkg, nfe2);
-
+
// Let's connect the output from the foreach containing
// number of quantiles and the sorted bag of samples to
// another foreach with the FindQuantiles udf. The input
- // to the FindQuantiles udf is a project(*) which takes the
+ // to the FindQuantiles udf is a project(*) which takes the
// foreach input and gives it to the udf
PhysicalPlan ep4 = new PhysicalPlan();
POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prjStar4.setResultType(DataType.TUPLE);
prjStar4.setStar(true);
ep4.add(prjStar4);
-
+
List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
ufInps.add(prjStar4);
-
- POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
+
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
new FuncSpec(udfClassName, udfArgs));
ep4.add(uf);
ep4.connect(prjStar4, uf);
-
+
List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
ep4s.add(ep4);
List<Boolean> flattened3 = new ArrayList<Boolean>();
flattened3.add(false);
POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
-
+
oper2.plan.add(nfe3);
oper2.plan.connect(nfe2, nfe3);
-
+
POStore str = getStore();
str.setSFile(sampleFile);
-
+
oper2.plan.add(str);
oper2.plan.connect(nfe3, str);
-
+
oper2.setClosed(true);
oper2.requestedParallelism = 1;
// oper2.markSampler();
return new Pair<TezOperator[], Integer>(opers, rp);
}
-
+
private static class FindKeyTypeVisitor extends PhyPlanVisitor {
byte keyType = DataType.UNKNOWN;
@@ -1081,7 +1080,7 @@ public class TezCompiler extends PhyPlan
keyType = p.getResultType();
}
}
-
+
private Pair<POProject,Byte> [] getSortCols(List<PhysicalPlan> plans) throws PlanException, ExecException {
if(plans!=null){
@SuppressWarnings("unchecked")
@@ -1105,7 +1104,7 @@ public class TezCompiler extends PhyPlan
String msg = "No expression plan found in POSort.";
throw new PlanException(msg, errCode, PigException.BUG);
}
-
+
private TezOperator[] getSortJobs(
POSort sort,
TezOperator quantJob,
@@ -1123,11 +1122,11 @@ public class TezCompiler extends PhyPlan
long limit = sort.getLimit();
oper1.limit = limit;
-
+
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
byte keyType = DataType.UNKNOWN;
-
+
boolean[] sortOrder;
List<Boolean> sortOrderList = sort.getMAscCols();
@@ -1167,7 +1166,7 @@ public class TezCompiler extends PhyPlan
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
}
-
+
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
@@ -1182,23 +1181,26 @@ public class TezCompiler extends PhyPlan
lr.setResultType(DataType.TUPLE);
lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
oper1.plan.addAsLeaf(lr);
-
+
oper1.setClosed(true);
-
+
TezOperator oper2 = getTezOp();
opers[1] = oper2;
tezPlan.add(oper2);
connect(tezPlan, oper1, oper2);
-
+
if (limit!=-1) {
- POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+ POPackage pkg_c = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ pkg_c.setPkgr(new LitePackager());
+ pkg_c.getPkgr().setKeyType(
+ (fields.length > 1) ? DataType.TUPLE : keyType);
pkg_c.setNumInps(1);
oper2.inEdges.put(oper1.getOperatorKey(), new TezEdgeDescriptor());
PhysicalPlan combinePlan = oper2.inEdges.get(oper1.getOperatorKey()).combinePlan;
-
+
combinePlan.add(pkg_c);
-
+
List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat_c1 = new ArrayList<Boolean>();
PhysicalPlan ep_c1 = new PhysicalPlan();
@@ -1209,25 +1211,25 @@ public class TezCompiler extends PhyPlan
ep_c1.add(prj_c1);
eps_c1.add(ep_c1);
flat_c1.add(true);
- POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+ POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
-1, eps_c1, flat_c1);
fe_c1.setResultType(DataType.TUPLE);
-
+
combinePlan.addAsLeaf(fe_c1);
-
+
POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit.setLimit(limit);
combinePlan.addAsLeaf(pLimit);
-
+
List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
eps_c2.addAll(sort.getSortPlans());
-
+
POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr_c2.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
+ String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
@@ -1235,13 +1237,17 @@ public class TezCompiler extends PhyPlan
lr_c2.setResultType(DataType.TUPLE);
combinePlan.addAsLeaf(lr_c2);
}
-
- POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ pkg.setPkgr(new LitePackager());
+ pkg.getPkgr().setKeyType(
+ (fields == null || fields.length > 1) ? DataType.TUPLE
+ :
keyType);
- pkg.setNumInps(1);
+ pkg.setNumInps(1);
oper2.plan.add(pkg);
-
+
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setColumn(1);
@@ -1264,7 +1270,7 @@ public class TezCompiler extends PhyPlan
return opers;
}
-
+
@Override
public void visitSort(POSort op) throws VisitorException {
try{
@@ -1274,15 +1280,15 @@ public class TezCompiler extends PhyPlan
FileSpec quantFile = getTempFileSpec();
int rp = op.getRequestedParallelism();
Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
- Pair<TezOperator[], Integer> quantJobParallelismPair =
+ Pair<TezOperator[], Integer> quantJobParallelismPair =
getQuantileJobs(op, oper, fSpec, quantFile, rp);
- TezOperator[] opers = getSortJobs(op, quantJobParallelismPair.first[1], fSpec, quantFile,
+ TezOperator[] opers = getSortJobs(op, quantJobParallelismPair.first[1], fSpec, quantFile,
quantJobParallelismPair.second, fields);
quantJobParallelismPair.first[1].segmentBelow = true;
-
+
curTezOp = opers[1];
-
+
// TODO: Review sort udf
// if(op.isUDFComparatorUsed){
// curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
@@ -1396,8 +1402,8 @@ public class TezCompiler extends PhyPlan
private POPackage getPackage() {
boolean[] inner = { false };
POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
- pkg.setInner(inner);
- pkg.setKeyType(DataType.TUPLE);
+ pkg.getPkgr().setInner(inner);
+ pkg.getPkgr().setKeyType(DataType.TUPLE);
pkg.setNumInps(1);
return pkg;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 28 08:56:33 2013
@@ -71,10 +71,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigIntWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigLongWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -189,7 +189,7 @@ public class TezDagBuilder extends TezOp
private void addCombiner(PhysicalPlan combinePlan, Configuration conf) throws IOException {
POPackage combPack = (POPackage)combinePlan.getRoots().get(0);
- setIntermediateInputKeyValue(combPack.getKeyType(), conf);
+ setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
POLocalRearrange combRearrange = (POLocalRearrange)combinePlan.getLeaves().get(0);
setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
@@ -232,7 +232,7 @@ public class TezDagBuilder extends TezOp
for (POStore st: stores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
- sFunc.setStoreLocation(st.getSFile().getFileName(), job);
+ sFunc.setStoreLocation(st.getSFile().getFileName(), job);
}
if (stores.size() == 1){
@@ -298,7 +298,7 @@ public class TezDagBuilder extends TezOp
List<PhysicalOperator> roots = tezOp.plan.getRoots();
if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
POPackage pack = (POPackage) roots.get(0);
- byte keyType = pack.getKeyType();
+ byte keyType = pack.getPkgr().getKeyType();
tezOp.plan.remove(pack);
conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
conf.set("pig.reduce.key.type", Byte.toString(keyType));
@@ -310,7 +310,7 @@ public class TezDagBuilder extends TezOp
}
conf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class);
-
+
if(tezOp.isGlobalSort() || tezOp.isLimitAfterSort()){
if (tezOp.isGlobalSort()) {
FileSystem fs = FileSystem.get(conf);
@@ -326,7 +326,7 @@ public class TezDagBuilder extends TezOp
conf.set("pig.quantilesFile", fstat.getPath().toString());
conf.set("pig.sortOrder",
ObjectSerializer.serialize(tezOp.getSortOrder()));
- conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class,
+ conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class,
Partitioner.class);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Thu Nov 28 08:56:33 2013
@@ -27,11 +27,9 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -102,32 +100,11 @@ public class TezPOPackageAnnotator exten
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
@Override
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
};
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
- */
- @Override
- public void visitJoinPackage(POJoinPackage joinPackage)
- throws VisitorException {
- this.pkg = joinPackage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
- */
- @Override
- public void visitCombinerPackage(POCombinerPackage pkg)
- throws VisitorException {
- this.pkg = pkg;
- }
-
/**
* @return the pkg
*/
@@ -154,15 +131,12 @@ public class TezPOPackageAnnotator exten
this.pkg = pkg;
}
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
- if (pkg instanceof POPackageLite) {
+ if (pkg.getPkgr() instanceof LitePackager) {
if(lrearrange.getIndex() != 0) {
// Throw some exception here
throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -171,7 +145,7 @@ public class TezPOPackageAnnotator exten
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
- keyInfo = pkg.getKeyInfo();
+ keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
@@ -188,9 +162,9 @@ public class TezPOPackageAnnotator exten
keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.setKeyInfo(keyInfo);
- pkg.setKeyTuple(lrearrange.isKeyTuple());
- pkg.setKeyCompound(lrearrange.isKeyCompound());
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
/**