You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/16 03:25:35 UTC
svn commit: r1558674 [1/2] - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pi...
Author: cheolsoo
Date: Thu Jan 16 02:25:35 2014
New Revision: 1558674
URL: http://svn.apache.org/r1558674
Log:
PIG-3644: Implement skewed join in Tez (cheolsoo)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
Modified:
pig/branches/tez/src/org/apache/pig/PigConfiguration.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
pig/branches/tez/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
pig/branches/tez/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan 16 02:25:35 2014
@@ -149,6 +149,17 @@ public class PigConfiguration {
public static final String PIG_DELETE_TEMP_FILE = "pig.delete.temp.files";
/**
+ * For a given mean and a confidence, a sample rate is obtained from a poisson udf
+ */
+ public static final String SAMPLE_RATE = "pig.sksampler.samplerate";
+
+ /**
+ * % of memory available for the input data. This is currently equal to the
+ * memory available for the skewed join
+ */
+ public static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
+
+ /**
* This key used to control the maximum size loaded into
* the distributed cache when doing fragment-replicated join
*/
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Thu Jan 16 02:25:35 2014
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
@@ -36,26 +35,35 @@ import org.apache.pig.impl.io.NullableTu
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.Pair;
+import com.google.common.collect.Maps;
+
/**
* This class is used by skewed join. For the partitioned table, the skewedpartitioner reads the key
* distribution data from the sampler file and returns the reducer index in a round robin fashion.
- * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
+ * For ex: if the key distribution file contains (k1, 5, 3) as an entry, reducers from 5 to 3 are returned
* in a round robin manner.
- */
+ */
public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
- Map<Tuple, Pair<Integer, Integer> > reducerMap = new HashMap<Tuple, Pair<Integer, Integer> >();
- static Map<Tuple, Integer> currentIndexMap = new HashMap<Tuple, Integer> ();
- Integer totalReducers;
- Configuration conf;
+
+ protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+ protected Integer totalReducers;
+ protected boolean inited = false;
+
+ private static Map<Tuple, Integer> currentIndexMap = Maps.newHashMap();
+ private Configuration conf;
@Override
- public int getPartition(PigNullableWritable wrappedKey, Writable value,
- int numPartitions) {
- // for streaming tables, return the partition index blindly
- if (wrappedKey instanceof NullablePartitionWritable && (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
- return ((NullablePartitionWritable)wrappedKey).getPartition();
- }
+ public int getPartition(PigNullableWritable wrappedKey, Writable value, int numPartitions) {
+ if (!inited) {
+ init();
+ }
+
+ // for streaming tables, return the partition index blindly
+ if (wrappedKey instanceof NullablePartitionWritable &&
+ (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+ return ((NullablePartitionWritable)wrappedKey).getPartition();
+ }
// for partition table, compute the index based on the sampler output
Pair <Integer, Integer> indexes;
@@ -78,7 +86,7 @@ public class SkewedPartitioner extends P
// if the partition file is empty, use numPartitions
totalReducers = (totalReducers > 0) ? totalReducers : numPartitions;
-
+
indexes = reducerMap.get(keyTuple);
// if the reducerMap does not contain the key, do the default hash based partitioning
if (indexes == null) {
@@ -105,24 +113,28 @@ public class SkewedPartitioner extends P
conf = job;
PigMapReduce.sJobConfInternal.set(conf);
PigMapReduce.sJobConf = conf;
- String keyDistFile = job.get("pig.keyDistFile", "");
- if (keyDistFile.length() == 0)
- throw new RuntimeException(this.getClass().getSimpleName() + " used but no key distribution found");
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ protected void init() {
+ String keyDistFile = conf.get("pig.keyDistFile", "");
+ if (keyDistFile.length() == 0) {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " used but no key distribution found");
+ }
try {
Integer [] redCnt = new Integer[1];
reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
- keyDistFile, redCnt, DataType.TUPLE, job);
+ keyDistFile, redCnt, DataType.TUPLE, conf);
// check if the partition file is empty
totalReducers = (redCnt[0] == null) ? -1 : redCnt[0];
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POPoissonSample extends PhysicalOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ // marker string to mark the last sample row, which has total number or rows
+ // seen by this map instance. this string will be in the 2nd last column of
+ // the last sample row it is used by GetMemNumRows.
+ public static final String NUMROWS_TUPLE_MARKER =
+ "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
+
+ private static final TupleFactory tf = TupleFactory.getInstance();
+ private static Result eop = new Result(POStatus.STATUS_EOP, null);
+
+ // num of rows sampled so far
+ private int numRowsSampled = 0;
+
+ // average size of tuple in memory, for tuples sampled
+ private long avgTupleMemSz = 0;
+
+ // current row number
+ private long rowNum = 0;
+
+ // number of tuples to skip after each sample
+ private long skipInterval = -1;
+
+ // bytes in input to skip after every sample.
+ // divide this by avgTupleMemSize to get skipInterval
+ private long memToSkipPerSample = 0;
+
+ // has the special row with row number information been returned
+ private boolean numRowSplTupleReturned = false;
+
+ // 17 is not a magic number. It can be obtained by using a poisson
+ // cumulative distribution function with the mean set to 10 (empirically,
+ // minimum number of samples) and the confidence set to 95%
+ public static final int DEFAULT_SAMPLE_RATE = 17;
+
+ private int sampleRate = 0;
+
+ private float heapPerc = 0f;
+
+ // new Sample result
+ private Result newSample = null;
+
+ public POPoissonSample(OperatorKey k, int rp, int sr, float hp) {
+ super(k, rp, null);
+ numRowsSampled = 0;
+ avgTupleMemSz = 0;
+ rowNum = 0;
+ skipInterval = -1;
+ memToSkipPerSample = 0;
+ numRowSplTupleReturned = false;
+ newSample = null;
+ sampleRate = sr;
+ heapPerc = hp;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ if (numRowSplTupleReturned) {
+ // row num special row has been returned after all inputs
+ // were read, nothing more to read
+ return eop;
+ }
+
+ Result res = null;
+ if (skipInterval == -1) {
+ // select first tuple as sample and calculate
+ // number of tuples to be skipped
+ while (true) {
+ res = processInput();
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else if (res.returnStatus == POStatus.STATUS_EOP
+ || res.returnStatus == POStatus.STATUS_ERR) {
+ return res;
+ }
+
+ if (res.result == null) {
+ return createNumRowTuple(null);
+ }
+ long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+ memToSkipPerSample = availRedMem/sampleRate;
+ updateSkipInterval((Tuple)res.result);
+
+ rowNum++;
+ newSample = res;
+ break;
+ }
+ }
+
+ // skip tuples
+ for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) {
+ res = processInput();
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else if (res.returnStatus == POStatus.STATUS_EOP
+ || res.returnStatus == POStatus.STATUS_ERR) {
+ return createNumRowTuple((Tuple)newSample.result);
+ }
+ rowNum++;
+ }
+
+ // skipped enough, get new sample
+ while (true) {
+ res = processInput();
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else if (res.returnStatus == POStatus.STATUS_EOP
+ || res.returnStatus == POStatus.STATUS_ERR) {
+ return createNumRowTuple((Tuple)newSample.result);
+ }
+
+ if (res.result == null) {
+ return createNumRowTuple((Tuple)newSample.result);
+ }
+ updateSkipInterval((Tuple)res.result);
+ Result currentSample = newSample;
+
+ rowNum++;
+ newSample = res;
+ return currentSample;
+ }
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "PoissonSample - " + mKey.toString();
+ }
+
+ /**
+ * Update the average tuple size base on newly sampled tuple t
+ * and recalculate skipInterval
+ * @param t - tuple
+ */
+ private void updateSkipInterval(Tuple t) {
+ avgTupleMemSz =
+ ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
+ skipInterval = memToSkipPerSample/avgTupleMemSz;
+
+ // skipping fewer number of rows the first few times, to reduce the
+ // probability of first tuples size (if much smaller than rest)
+ // resulting in very few samples being sampled. Sampling a little extra
+ // is OK
+ if(numRowsSampled < 5) {
+ skipInterval = skipInterval/(10-numRowsSampled);
+ }
+ ++numRowsSampled;
+ }
+
+ /**
+ * @param sample - sample tuple
+ * @return - Tuple appended with special marker string column, num-rows column
+ * @throws ExecException
+ */
+ private Result createNumRowTuple(Tuple sample) throws ExecException {
+ int sz = (sample == null) ? 0 : sample.size();
+ Tuple t = tf.newTuple(sz + 2);
+
+ if (sample != null) {
+ for (int i=0; i<sample.size(); i++){
+ t.set(i, sample.get(i));
+ }
+ }
+
+ t.set(sz, NUMROWS_TUPLE_MARKER);
+ t.set(sz + 1, rowNum);
+ numRowSplTupleReturned = true;
+ return new Result(POStatus.STATUS_OK, t);
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Jan 16 02:25:35 2014
@@ -90,8 +90,7 @@ public class POReservoirSample extends P
rowProcessed++;
} else if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
- }
- else {
+ } else {
break;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Thu Jan 16 02:25:35 2014
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -44,12 +45,14 @@ import org.apache.tez.runtime.library.ou
public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
private static final long serialVersionUID = 1L;
- private static Result empty = new Result(POStatus.STATUS_NULL, null);
+ protected static Result empty = new Result(POStatus.STATUS_NULL, null);
+
+ protected String outputKey;
+ protected transient KeyValueWriter writer;
- private String outputKey;
- private transient KeyValueWriter writer;
// Tez union is implemented as LR + Pkg
private boolean isUnion = false;
+ private boolean isSkewedJoin = false;
public POLocalRearrangeTez(OperatorKey k) {
super(k);
@@ -79,6 +82,14 @@ public class POLocalRearrangeTez extends
this.isUnion = isUnion;
}
+ public boolean isSkewedJoin() {
+ return isSkewedJoin;
+ }
+
+ public void setSkewedJoin(boolean isSkewedJoin) {
+ this.isSkewedJoin = isSkewedJoin;
+ }
+
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
@@ -124,10 +135,25 @@ public class POLocalRearrangeTez extends
Byte index = (Byte)result.get(0);
PigNullableWritable key = null;
NullableTuple val = null;
- if (isUnion()) {
+ if (isUnion) {
// Use the entire tuple as both key and value
key = HDataType.getWritableComparableTypes(result.get(1), keyType);
val = new NullableTuple((Tuple)result.get(1));
+ } else if (isSkewedJoin) {
+ // Skewed join uses NullablePartitionWritable as key
+ Byte tupleKeyIdx = 2;
+ Byte tupleValIdx = 3;
+
+ Integer partitionIndex = -1;
+ tupleKeyIdx--;
+ tupleValIdx--;
+
+ key = HDataType.getWritableComparableTypes(result.get(tupleKeyIdx), keyType);
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+ wrappedKey.setIndex(index);
+ wrappedKey.setPartition(partitionIndex);
+ key = wrappedKey;
+ val = new NullableTuple((Tuple)result.get(tupleValIdx));
} else {
key = HDataType.getWritableComparableTypes(result.get(1), keyType);
val = new NullableTuple((Tuple)result.get(2));
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
+import org.python.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
+
+/**
+ * The partition rearrange operator is a part of the skewed join implementation.
+ * It has an embedded physical plan that generates tuples of the form
+ * (inpKey,reducerIndex,(indxed inp Tuple)).
+ */
+public class POPartitionRearrangeTez extends POLocalRearrangeTez {
+ private static final long serialVersionUID = 1L;
+ private static final TupleFactory tf = TupleFactory.getInstance();
+ private static final BagFactory mBagFactory = BagFactory.getInstance();
+
+ // ReducerMap will store the tuple, max reducer index & min reducer index
+ private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+ private Integer totalReducers = -1;
+ private boolean inited;
+
+ public POPartitionRearrangeTez(OperatorKey k, int rp) {
+ super(k, rp);
+ index = -1;
+ leafOps = Lists.newArrayList();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "Partition Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t->\t " + outputKey;
+ }
+
+ /**
+ * Calls getNext on the generate operator inside the nested physical plan.
+ * Converts the generated tuple into the proper format, i.e,
+ * (key,indexedTuple(value))
+ */
+ @Override
+ public Result getNextTuple() throws ExecException {
+ if (!inited) {
+ init();
+ }
+
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
+ break;
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ for (PhysicalPlan ep : plans) {
+ ep.attachInput((Tuple)inp.result);
+ }
+
+ Result res = null;
+ List<Result> resLst = new ArrayList<Result>();
+ for (ExpressionOperator op : leafOps){
+ res = op.getNext(op.getResultType());
+ if (res.returnStatus != POStatus.STATUS_OK) {
+ return new Result();
+ }
+ resLst.add(res);
+ }
+ res.result = constructPROutput(resLst,(Tuple)inp.result);
+ if (writer == null) { // In the case of combiner
+ return res;
+ }
+
+ Iterator<Tuple> its = ((DataBag)res.result).iterator();
+ while(its.hasNext()) {
+ Tuple result = its.next();
+ Byte tupleKeyIdx = 2;
+ Byte tupleValIdx = 3;
+
+ Integer partitionIndex = -1;
+ partitionIndex = (Integer)result.get(1);
+
+ PigNullableWritable key =
+ HDataType.getWritableComparableTypes(result.get(tupleKeyIdx), keyType);
+ NullableTuple val = new NullableTuple((Tuple)result.get(tupleValIdx));
+
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+ wrappedKey.setIndex(index);
+ wrappedKey.setPartition(partitionIndex);
+ val.setIndex(index);
+
+ try {
+ writer.write(wrappedKey, val);
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POPartitionRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+
+ res = empty;
+ }
+ return inp;
+ }
+
+ // Returns bag of tuples
+ protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
+ Tuple t = super.constructLROutput(resLst, null, value);
+
+ //Construct key
+ Object key = t.get(1);
+
+ // Construct an output bag and feed in the tuples
+ DataBag opBag = mBagFactory.newDefaultBag();
+
+ // Put the index, key, and value in a tuple and return
+ // first -> min, second -> max
+ Pair <Integer, Integer> indexes = reducerMap.get(key);
+
+ // For non skewed keys, we set the partition index to be -1
+ if (indexes == null) {
+ indexes = new Pair <Integer, Integer>(-1,0);
+ }
+
+ for (Integer reducerIdx=indexes.first, cnt=0; cnt <= indexes.second; reducerIdx++, cnt++) {
+ if (reducerIdx >= totalReducers) {
+ reducerIdx = 0;
+ }
+ Tuple opTuple = mTupleFactory.newTuple(4);
+ opTuple.set(0, t.get(0));
+ // set the partition index
+ opTuple.set(1, reducerIdx.intValue());
+ opTuple.set(2, key);
+ opTuple.set(3, t.get(2));
+
+ opBag.add(opTuple);
+ }
+
+ return opBag;
+ }
+
+ private void init() throws RuntimeException {
+ Map<String, Object> distMap = null;
+ if (PigProcessor.sampleMap != null) {
+ // We've already collected sampleMap in PigProcessor
+ distMap = PigProcessor.sampleMap;
+ } else {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " used but no key distribution found");
+ }
+
+ try {
+ if (distMap != null) {
+ Integer[] totalReducers = new Integer[1];
+
+ // The distMap is structured as (key, min, max) where min, max
+ // being the index of the reducers
+ DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers[0] + maxIndex;
+ }
+
+ Tuple keyT;
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex
+ // and maxIndex and store it in the reducer map
+ Tuple keyTuple = tf.newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
+ }
+ keyT = keyTuple;
+ } else {
+ keyT = tf.newTuple(1);
+ keyT.set(0,idxTuple.get(0));
+ }
+ // number of reducers
+ Integer cnt = maxIndex - minIndex;
+ // 1 is added to account for the 0 index
+ reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ inited = true;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Jan 16 02:25:35 2014
@@ -53,6 +53,7 @@ public class POShuffleTezLoad extends PO
private boolean[] readOnce;
private WritableComparator comparator = null;
+ private boolean isSkewedJoin = false;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -63,7 +64,7 @@ public class POShuffleTezLoad extends PO
throws ExecException {
try {
comparator = ReflectionUtils.newInstance(
- TezDagBuilder.comparatorForKeyType(pkgr.getKeyType()), conf);
+ TezDagBuilder.comparatorForKeyType(pkgr.getKeyType(), isSkewedJoin), conf);
} catch (JobCreationException e) {
throw new ExecException(e);
}
@@ -195,6 +196,14 @@ public class POShuffleTezLoad extends PO
inputKeys.add(inputKey);
}
+ public void setSkewedJoins(boolean isSkewedJoin) {
+ this.isSkewedJoin = isSkewedJoin;
+ }
+
+ public boolean isSkewedJoin() {
+ return isSkewedJoin;
+ }
+
@Override
public boolean supportsMultipleInputs() {
return true;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Jan 16 02:25:35 2014
@@ -63,8 +63,8 @@ public class PigProcessor implements Log
private PhysicalOperator leaf;
private Configuration conf;
-
- public static Map<String, Object> quantileMap = null;
+
+ public static Map<String, Object> sampleMap = null;
@Override
public void initialize(TezProcessorContext processorContext)
@@ -98,15 +98,16 @@ public class PigProcessor implements Log
}
+ @SuppressWarnings("rawtypes")
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
initializeInputs(inputs);
initializeOutputs(outputs);
-
+
if (conf.get("pig.sampleVertex") != null) {
- collectQuantile((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
+ collectSample((BroadcastKVReader)inputs.get(conf.get("pig.sampleVertex")).getReader());
}
List<PhysicalOperator> leaves = null;
@@ -198,16 +199,14 @@ public class PigProcessor implements Log
}
}
}
-
- private void collectQuantile(BroadcastKVReader reader) throws IOException {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void collectSample(BroadcastKVReader reader) throws IOException {
reader.next();
Object val = reader.getCurrentValue();
NullableTuple nTup = (NullableTuple) val;
Tuple t = (Tuple) nTup.getValueAsPigType();
- // the Quantiles file has a tuple as under:
- // (numQuantiles, bag of samples)
- // numQuantiles here is the reduce parallelism
- quantileMap = (Map<String, Object>) t.get(0);
+ sampleMap = (Map<String, Object>) t.get(0);
}
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1558674&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Thu Jan 16 02:25:35 2014
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.util.Pair;
+
+public class SkewedPartitionerTez extends SkewedPartitioner {
+ private static final TupleFactory tf = TupleFactory.getInstance();
+
+ @Override
+ protected void init() {
+ Map<String, Object> distMap = null;
+ if (PigProcessor.sampleMap != null) {
+ // We've collected sampleMap in PigProcessor
+ distMap = PigProcessor.sampleMap;
+ } else {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " used but no key distribution found");
+ }
+
+ try {
+ if (distMap != null) {
+ Integer[] totalReducers = new Integer[1];
+
+ // The distMap is structured as (key, min, max) where min, max
+ // being the index of the reducers
+ DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers[0] + maxIndex;
+ }
+
+ Tuple keyT;
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+ // it in the reducer map
+ Tuple keyTuple = tf.newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
+ }
+ keyT = keyTuple;
+ } else {
+ keyT = tf.newTuple(1);
+ keyT.set(0,idxTuple.get(0));
+ }
+ // number of reducers
+ Integer cnt = maxIndex - minIndex;
+ // 1 is added to account for the 0 index
+ reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ inited = true;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan 16 02:25:35 2014
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import java.util.Stack;
@@ -34,6 +35,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -63,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -78,6 +81,8 @@ import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -88,6 +93,8 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.CompilerUtils;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -107,6 +114,7 @@ public class TezCompiler extends PhyPlan
private static final Log LOG = LogFactory.getLog(TezCompiler.class);
private PigContext pigContext;
+ private Properties pigProperties;
//The plan that is being compiled
private PhysicalPlan plan;
@@ -147,6 +155,7 @@ public class TezCompiler extends PhyPlan
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.plan = plan;
this.pigContext = pigContext;
+ pigProperties = pigContext.getProperties();
splitsSeen = Maps.newHashMap();
tezPlan = new TezOperPlan();
nig = NodeIdGenerator.getGenerator();
@@ -161,9 +170,9 @@ public class TezCompiler extends PhyPlan
localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
phyToTezOpMap = Maps.newHashMap();
- fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
+ fileConcatenationThreshold = Integer.parseInt(pigProperties
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
- optimisticFileConcatenation = pigContext.getProperties().getProperty(
+ optimisticFileConcatenation = pigProperties.getProperty(
OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
LOG.info("File concatenation threshold: " + fileConcatenationThreshold
+ " optimistic? " + optimisticFileConcatenation);
@@ -1170,9 +1179,249 @@ public class TezCompiler extends PhyPlan
@Override
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ try {
+ // LR that transfers loaded input to partition vertex
+ POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ // LR that broadcasts sampled input to sampling aggregation vertex
+ POLocalRearrangeTez lrTezSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
+
+ int sampleRate = POPoissonSample.DEFAULT_SAMPLE_RATE;
+ if (pigProperties.containsKey(PigConfiguration.SAMPLE_RATE)) {
+ sampleRate = Integer.valueOf(pigProperties.getProperty(PigConfiguration.SAMPLE_RATE));
+ }
+ float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+ if (pigProperties.containsKey(PigConfiguration.PERC_MEM_AVAIL)) {
+ heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PERC_MEM_AVAIL));
+ }
+ POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
+ -1, sampleRate, heapPerc);
+
+ TezOperator prevOp = compiledInputs[0];
+ prevOp.plan.addAsLeaf(lrTez);
+ prevOp.plan.addAsLeaf(poSample);
+
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+ List<PhysicalOperator> l = plan.getPredecessors(op);
+ List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+ List<Boolean> ascCol = new ArrayList<Boolean>();
+ for (int i=0; i< groups.size(); i++) {
+ ascCol.add(false);
+ }
+
+ // Set up transform plan to get keys and memory size of input
+ // tuples. It first adds all the plans to get key columns.
+ List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+ transformPlans.addAll(groups);
+
+ // Then it adds a column for memory size
+ POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar.setResultType(DataType.TUPLE);
+ prjStar.setStar(true);
+
+ List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+ ufInps.add(prjStar);
+
+ PhysicalPlan ep = new PhysicalPlan();
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)),
+ -1, ufInps, new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
+ uf.setResultType(DataType.TUPLE);
+ ep.add(uf);
+ ep.add(prjStar);
+ ep.connect(prjStar, uf);
+
+ transformPlans.add(ep);
+
+ List<Boolean> flat1 = new ArrayList<Boolean>();
+ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+ for (int i=0; i<transformPlans.size(); i++) {
+ eps1.add(transformPlans.get(i));
+ flat1.add(true);
+ }
+
+ // This foreach will pick the sort key columns from the POPoissonSample output
+ POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+ -1, eps1, flat1);
+ prevOp.plan.addAsLeaf(nfe1);
+ prevOp.plan.addAsLeaf(lrTezSample);
+ prevOp.setClosed(true);
+
+ POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(),
+ null, groups, ascCol, null);
+ String per = pigProperties.getProperty("pig.skewedjoin.reduce.memusage",
+ String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
+ String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+
+ int rp = Math.max(op.getRequestedParallelism(), 1);
+ Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJobs(sort, rp, null,
+ PartitionSkewedKeys.class.getName(), new String[]{per, mc}, 2);
+ rp = sampleJobPair.second;
+
+ // Set parallelism of SkewedJoin as the value calculated by sampling
+ // job if "parallel" is specified in join statement, "rp" is equal
+ // to that number if not specified, use the value that sampling
+ // process calculated based on default.
+ op.setRequestedParallelism(rp);
+
+ TezOperator[] joinJobs = new TezOperator[] {null, compiledInputs[1], null};
+ TezOperator[] joinInputs = new TezOperator[] {compiledInputs[0], compiledInputs[1]};
+ TezOperator[] rearrangeOutputs = new TezOperator[2];
+
+ compiledInputs = new TezOperator[] {joinInputs[0]};
+
+ blocking();
+
+ // Then add a POPackage and a POForEach to the start of the new tezOp.
+ POPackage pkg = getPackage(1, DataType.BYTEARRAY);
+ curTezOp.plan.add(pkg);
+
+ POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ project.setResultType(DataType.BAG);
+ project.setStar(false);
+ project.setColumn(1);
+ POForEach forEach =
+ TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
+ curTezOp.plan.addAsLeaf(forEach);
+ joinJobs[0] = curTezOp;
+
+ // Run POLocalRearrange for first join table
+ POLocalRearrangeTez lr =
+ new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+
+ // Check the type of group keys, if there are more than one field, the key is TUPLE.
+ byte type = DataType.TUPLE;
+ if (groups.size() == 1) {
+ type = groups.get(0).getLeaves().get(0).getResultType();
+ }
+
+ // Run POLocalRearrange for first join table
+ lr.setKeyType(type);
+ lr.setPlans(groups);
+ lr.setSkewedJoin(true);
+ lr.setResultType(DataType.TUPLE);
+ joinJobs[0].plan.addAsLeaf(lr);
+ joinJobs[0].setClosed(true);
+ if (lr.getRequestedParallelism() > joinJobs[0].requestedParallelism) {
+ joinJobs[0].requestedParallelism = lr.getRequestedParallelism();
+ }
+ rearrangeOutputs[0] = joinJobs[0];
+
+ compiledInputs = new TezOperator[] {joinInputs[1]};
+
+ // Run POPartitionRearrange for second join table
+ POPartitionRearrangeTez pr =
+ new POPartitionRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+ try {
+ pr.setIndex(1);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POPartitionRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+
+ groups = joinPlans.get(l.get(1));
+ pr.setPlans(groups);
+ pr.setKeyType(type);
+ pr.setSkewedJoin(true);
+ pr.setResultType(DataType.BAG);
+ joinJobs[1].plan.addAsLeaf(pr);
+ joinJobs[1].setClosed(true);
+ if (pr.getRequestedParallelism() > joinJobs[1].requestedParallelism) {
+ joinJobs[1].requestedParallelism = pr.getRequestedParallelism();
+ }
+ rearrangeOutputs[1] = joinJobs[1];
+
+ compiledInputs = rearrangeOutputs;
+
+ // Create POGlobalRearrange
+ POGlobalRearrange gr =
+ new POGlobalRearrange(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+ // Skewed join has its own special partitioner
+ gr.setResultType(DataType.TUPLE);
+ gr.visit(this);
+ joinJobs[2] = curTezOp;
+ if (gr.getRequestedParallelism() > joinJobs[2].requestedParallelism) {
+ joinJobs[2].requestedParallelism = gr.getRequestedParallelism();
+ }
+
+ compiledInputs = new TezOperator[] {joinJobs[2]};
+
+ // Create POPakcage
+ pkg = getPackage(2, DataType.TUPLE);
+ boolean [] inner = op.getInnerFlags();
+ pkg.getPkgr().setInner(inner);
+ pkg.visit(this);
+
+ // Create POForEach
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ List<Boolean> flat = new ArrayList<Boolean>();
+
+ // Add corresponding POProjects
+ for (int i=0; i < 2; i++) {
+ ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ prj.setColumn(i+1);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BAG);
+ ep.add(prj);
+ eps.add(ep);
+ if (!inner[i]) {
+ // Add an empty bag for outer join
+ CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+ }
+ flat.add(true);
+ }
+
+ POForEach fe =
+ new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, eps, flat);
+ fe.setResultType(DataType.TUPLE);
+ fe.visit(this);
+
+ // Connect vertices
+ lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
+ lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
+
+ TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey());
+ // TODO: Convert to unsorted shuffle after TEZ-661
+ // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ edge.partitionerClass = RoundRobinPartitioner.class;
+
+ TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
+
+ POSplit split = (POSplit) sampleJobPair.first.plan.getLeaves().get(0);
+ List<PhysicalPlan> pp = split.getPlans();
+ for (int i = 0; i < pp.size(); i++) {
+ TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+
+ // Configure broadcast edges for distribution map
+ edge = joinJobs[i].inEdges.get(sampleJobPair.first.getOperatorKey());
+ edge.dataMovementType = DataMovementType.BROADCAST;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ lrTez = (POLocalRearrangeTez) pp.get(i).getLeaves().get(0);
+ lrTez.setOutputKey(joinJobs[i].getOperatorKey().toString());
+ joinJobs[i].sampleOperator = sampleJobPair.first;
+
+ // Configure skewed partitioner for join
+ edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+ edge.partitionerClass = SkewedPartitionerTez.class;
+ }
+ joinJobs[2].setSkewedJoin(true);
+
+ phyToTezOpMap.put(op, curTezOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+ }
}
/**
@@ -1190,7 +1439,8 @@ public class TezCompiler extends PhyPlan
* @return Tez operator that now is finished with a store.
* @throws PlanException
*/
- private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr, POLocalRearrangeTez lrSample) throws PlanException{
+ private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr,
+ POLocalRearrangeTez lrSample) throws PlanException {
if(compiledInputs.length>1) {
int errCode = 2023;
String msg = "Received a multi input plan when expecting only a single input one.";
@@ -1204,7 +1454,6 @@ public class TezCompiler extends PhyPlan
List<Boolean> flat1 = new ArrayList<Boolean>();
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-
Pair<POProject, Byte>[] sortProjs = null;
try{
sortProjs = getSortCols(sort.getSortPlans());
@@ -1259,7 +1508,7 @@ public class TezCompiler extends PhyPlan
oper.plan.addAsLeaf(lrSample);
} else {
int errCode = 2022;
- String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+ String msg = "The current operator is closed. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
return oper;
@@ -1292,13 +1541,12 @@ public class TezCompiler extends PhyPlan
}
}
- return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs);
+ return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs, 1);
}
/**
* Create a sampling job to collect statistics by sampling input data. The
* sequence of operations is as following:
- * <li>Transform input sample tuples into another tuple.</li>
* <li>Add an extra field "all" into the tuple </li>
* <li>Package all tuples into one bag </li>
* <li>Add constant field for number of reducers. </li>
@@ -1317,7 +1565,7 @@ public class TezCompiler extends PhyPlan
* @throws ExecException
*/
private Pair<TezOperator,Integer> getSamplingAggregationJobs(POSort sort, int rp,
- List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs )
+ List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs, int numLRs)
throws PlanException, VisitorException, ExecException {
TezOperator oper = getTezOp();
@@ -1448,12 +1696,28 @@ public class TezCompiler extends PhyPlan
oper.plan.add(nfe3);
oper.plan.connect(nfe2, nfe3);
- POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
-
- oper.plan.add(lr);
- oper.plan.connect(nfe3, lr);
-
+ if (numLRs > 1) {
+ // Skewed join broadcast sample map to multiple vertices, so we need
+ // to add POSplit to the plan and attach LRs to POSplit.
+ POSplit split = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ oper.setSplitOperatorKey(split.getOperatorKey());
+ oper.plan.add(split);
+ oper.plan.connect(nfe3, split);
+ splitsSeen.put(split.getOperatorKey(), oper);
+
+ for (int i = 0; i < numLRs; i++) {
+ POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(lr);
+ split.addPlan(pp);
+ }
+ } else {
+ POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ oper.plan.add(lr);
+ oper.plan.connect(nfe3, lr);
+ }
oper.setClosed(true);
+
oper.requestedParallelism = 1;
oper.markSampler();
return new Pair<TezOperator, Integer>(oper, rp);
@@ -1672,8 +1936,7 @@ public class TezCompiler extends PhyPlan
int rp = Math.max(op.getRequestedParallelism(), 1);
- Pair<TezOperator, Integer> quantJobParallelismPair =
- getQuantileJobs(op, rp);
+ Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 16 02:25:35 2014
@@ -81,6 +81,7 @@ import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -192,8 +193,8 @@ public class TezDagBuilder extends TezOp
for (POLocalRearrangeTez lr : lrs) {
if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
- setIntermediateInputKeyValue(keyType, conf);
- setIntermediateOutputKeyValue(keyType, conf);
+ setIntermediateInputKeyValue(keyType, conf, to.isSkewedJoin());
+ setIntermediateOutputKeyValue(keyType, conf, to.isSkewedJoin());
conf.set("pig.reduce.key.type", Byte.toString(keyType));
break;
}
@@ -201,7 +202,7 @@ public class TezDagBuilder extends TezOp
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-
+
if (edge.partitionerClass != null) {
conf.setClass("mapreduce.job.partitioner.class",
edge.partitionerClass, Partitioner.class);
@@ -234,11 +235,11 @@ public class TezDagBuilder extends TezOp
private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
Configuration conf) throws IOException {
POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
- setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
+ setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp.isSkewedJoin());
POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
.getLeaves().get(0);
- setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
+ setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp.isSkewedJoin());
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
combinePlan, pkgTezOp, combPack);
@@ -324,13 +325,16 @@ public class TezDagBuilder extends TezOp
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
- setIntermediateInputKeyValue(keyType, payloadConf);
+ setIntermediateInputKeyValue(keyType, payloadConf, tezOp.isSkewedJoin());
POShuffleTezLoad newPack;
if (tezOp.isUnion()) {
newPack = new POUnionTezLoad(pack);
} else {
newPack = new POShuffleTezLoad(pack);
}
+ if (tezOp.isSkewedJoin()) {
+ newPack.setSkewedJoins(true);
+ }
tezOp.plan.add(newPack);
// Set input keys for POShuffleTezLoad. This is used to identify
@@ -350,24 +354,15 @@ public class TezDagBuilder extends TezOp
}
}
- @SuppressWarnings("rawtypes")
- Class<? extends WritableComparable> keyClass = HDataType
- .getWritableComparableTypes(pack.getPkgr().getKeyType())
- .getClass();
- payloadConf.set(
- TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
- keyClass.getName());
- payloadConf.set(
- TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
- NullableTuple.class.getName());
- selectInputComparator(payloadConf, pack.getPkgr().getKeyType());
+ setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
+ tezOp.isSkewedJoin());
}
payloadConf.setClass("mapreduce.outputformat.class",
PigOutputFormat.class, OutputFormat.class);
- // set parent plan in all operators.
- // currently the parent plan is really used only when POStream, POSplit are present in the plan
+ // set parent plan in all operators. currently the parent plan is really
+ // used only when POStream, POSplit are present in the plan
new PhyPlanSetter(tezOp.plan).visit();
// Serialize the execution plan
@@ -606,35 +601,49 @@ public class TezDagBuilder extends TezOp
}
@SuppressWarnings("rawtypes")
- private void setIntermediateInputKeyValue(byte keyType, Configuration conf)
+ private void setIntermediateInputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
throws JobCreationException, ExecException {
Class<? extends WritableComparable> keyClass = HDataType
.getWritableComparableTypes(keyType).getClass();
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
- keyClass.getName());
+ if (isSkewedJoin) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ NullablePartitionWritable.class.getName());
+ } else {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ keyClass.getName());
+ }
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
NullableTuple.class.getName());
- selectInputComparator(conf, keyType);
+ selectInputComparator(conf, keyType, isSkewedJoin);
}
@SuppressWarnings("rawtypes")
- private void setIntermediateOutputKeyValue(byte keyType, Configuration conf)
+ private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
throws JobCreationException, ExecException {
Class<? extends WritableComparable> keyClass = HDataType
.getWritableComparableTypes(keyType).getClass();
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
- keyClass.getName());
+ if (isSkewedJoin) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ NullablePartitionWritable.class.getName());
+ } else {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ keyClass.getName());
+ }
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
NullableTuple.class.getName());
conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
- selectOutputComparator(keyType, conf);
+ selectOutputComparator(keyType, conf, isSkewedJoin);
}
- static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
+ static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean isSkewedJoin)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
+ if (isSkewedJoin) {
+ return JobControlCompiler.PigGroupingPartitionWritableComparator.class;
+ }
+
switch (keyType) {
case DataType.BOOLEAN:
return PigBooleanRawComparator.class;
@@ -686,19 +695,19 @@ public class TezDagBuilder extends TezOp
}
}
- void selectInputComparator(Configuration conf, byte keyType)
+ void selectInputComparator(Configuration conf, byte keyType, boolean isSkewedJoin)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
conf.setClass(
TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
- comparatorForKeyType(keyType), RawComparator.class);
+ comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
}
- void selectOutputComparator(byte keyType, Configuration conf)
+ void selectOutputComparator(byte keyType, Configuration conf, boolean isSkewedJoin)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
conf.setClass(
TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
- comparatorForKeyType(keyType), RawComparator.class);
+ comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan 16 02:25:35 2014
@@ -90,6 +90,8 @@ public class TezOperator extends Operato
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
+ private boolean skewedJoin = false;
+
// Flag to indicate if the small input splits need to be combined to form a larger
// one in order to reduce the number of mappers. For merge join, both tables
// are NOT combinable for correctness.
@@ -287,10 +289,22 @@ public class TezOperator extends Operato
return globalSort;
}
+ public void setLimitAfterSort(boolean limitAfterSort) {
+ this.limitAfterSort = limitAfterSort;
+ }
+
public boolean isLimitAfterSort() {
return limitAfterSort;
}
+ public void setSkewedJoin(boolean skewedJoin) {
+ this.skewedJoin = skewedJoin;
+ }
+
+ public boolean isSkewedJoin() {
+ return skewedJoin;
+ }
+
protected void noCombineSmallSplits() {
combineSmallSplits = false;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1558674&r1=1558673&r2=1558674&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Thu Jan 16 02:25:35 2014
@@ -36,9 +36,9 @@ public class WeightedRangePartitionerTez
@Override
public void init() {
Map<String, Object> quantileMap = null;
- if (PigProcessor.quantileMap!=null){
- // We've already collect quantileMap in PigProcessor.quantileMap
- quantileMap = PigProcessor.quantileMap;
+ if (PigProcessor.sampleMap != null) {
+ // We've collected sampleMap in PigProcessor
+ quantileMap = PigProcessor.sampleMap;
} else {
throw new RuntimeException(this.getClass().getSimpleName()
+ " used but no quantiles found");