You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/11 22:34:49 UTC
svn commit: r1678820 - in /pig/branches/branch-0.15: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/hado...
Author: daijy
Date: Mon May 11 20:34:48 2015
New Revision: 1678820
URL: http://svn.apache.org/r1678820
Log:
PIG-4377: Skewed outer join produce wrong result in some cases
Added:
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java
pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon May 11 20:34:48 2015
@@ -66,6 +66,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4377: Skewed outer join produce wrong result in some cases (daijy)
+
PIG-4538: Pig script fail with CNF in follow up MR job (daijy)
PIG-4537: Fix unit test failure introduced by TEZ-2392: TestCollectedGroup, TestLimitVariable,
Modified: pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java Mon May 11 20:34:48 2015
@@ -97,6 +97,13 @@ public class PigConfiguration {
public static final String PIG_SKEWEDJOIN_REDUCE_MEMUSAGE = "pig.skewedjoin.reduce.memusage";
/**
+ * Memory available (in bytes) in reduce when calculating memory available for skewed join.
+ * By default, it is set to Runtime.getRuntime().maxMemory(). Override it only
+ * for debug purpose
+ */
+ public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
+
+ /**
* This key used to control the maximum size loaded into
* the distributed cache when doing fragment-replicated join
*/
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon May 11 20:34:48 2015
@@ -93,6 +93,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.IsFirstReduceOfKey;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -1951,7 +1952,7 @@ public class MRCompiler extends PhyPlanV
eps.add(ep);
if (!inner[i]) {
// Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+ CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
}
flat.add(true);
}
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon May 11 20:34:48 2015
@@ -47,7 +47,7 @@ import com.google.common.collect.Maps;
public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
protected static final TupleFactory tf = TupleFactory.getInstance();
- protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+ protected Map<Object, Pair<Integer, Integer>> reducerMap;
protected Integer totalReducers = -1;
protected boolean inited = false;
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Mon May 11 20:34:48 2015
@@ -63,10 +63,12 @@ public class POPoissonSample extends Phy
private float heapPerc = 0f;
+ private long totalMemory = Runtime.getRuntime().maxMemory();
+
// new Sample result
private Result newSample = null;
- public POPoissonSample(OperatorKey k, int rp, int sr, float hp) {
+ public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
super(k, rp, null);
numRowsSampled = 0;
avgTupleMemSz = 0;
@@ -77,6 +79,9 @@ public class POPoissonSample extends Phy
newSample = null;
sampleRate = sr;
heapPerc = hp;
+ if (tm != -1) {
+ totalMemory = tm;
+ }
}
@Override
@@ -115,7 +120,7 @@ public class POPoissonSample extends Phy
if (res.result == null) {
continue;
}
- long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+ long availRedMem = (long) (totalMemory * heapPerc);
memToSkipPerSample = availRedMem/sampleRate;
updateSkipInterval((Tuple)res.result);
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Mon May 11 20:34:48 2015
@@ -95,6 +95,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
@@ -1477,8 +1478,12 @@ public class TezCompiler extends PhyPlan
if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)) {
heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE));
}
+ long totalMemory = -1;
+ if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)) {
+ totalMemory = Long.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM));
+ }
POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
- -1, sampleRate, heapPerc);
+ -1, sampleRate, heapPerc, totalMemory);
TezOperator prevOp = compiledInputs[0];
prevOp.plan.addAsLeaf(lrTez);
@@ -1645,7 +1650,7 @@ public class TezCompiler extends PhyPlan
eps.add(ep);
if (!inner[i]) {
// Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+ CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
}
flat.add(true);
}
@@ -1675,7 +1680,7 @@ public class TezCompiler extends PhyPlan
TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i <= 2; i++) {
joinJobs[i].setSampleOperator(sampleJobPair.first);
// Configure broadcast edges for distribution map
@@ -1684,8 +1689,10 @@ public class TezCompiler extends PhyPlan
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
// Configure skewed partitioner for join
- edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
- edge.partitionerClass = SkewedPartitionerTez.class;
+ if (i != 2) {
+ edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+ edge.partitionerClass = SkewedPartitionerTez.class;
+ }
}
joinJobs[2].markSkewedJoin();
Added: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java Mon May 11 20:34:48 2015
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.udf;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil;
+import org.apache.pig.impl.builtin.IsFirstReduceOfKey;
+
+public class IsFirstReduceOfKeyTez extends IsFirstReduceOfKey {
+ @Override
+ protected void init() {
+ reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf);
+ }
+}
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java Mon May 11 20:34:48 2015
@@ -17,73 +17,14 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil;
public class SkewedPartitionerTez extends SkewedPartitioner {
- private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class);
@Override
protected void init() {
-
- Map<String, Object> distMap = null;
- if (PigProcessor.sampleMap != null) {
- // We've collected sampleMap in PigProcessor
- distMap = PigProcessor.sampleMap;
- } else {
- LOG.info("Key distribution map is empty");
- inited = true;
- return;
- }
-
- long start = System.currentTimeMillis();
-
- try {
- // The distMap is structured as (key, min, max) where min, max
- // being the index of the reducers
- DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
- totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
- Iterator<Tuple> it = partitionList.iterator();
- while (it.hasNext()) {
- Tuple idxTuple = it.next();
- Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
- Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
- // Used to replace the maxIndex with the number of reducers
- if (maxIndex < minIndex) {
- maxIndex = totalReducers + maxIndex;
- }
-
- Tuple keyT;
- // if the join is on more than 1 key
- if (idxTuple.size() > 3) {
- // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
- // it in the reducer map
- Tuple keyTuple = tf.newTuple();
- for (int i=0; i < idxTuple.size() - 2; i++) {
- keyTuple.append(idxTuple.get(i));
- }
- keyT = keyTuple;
- } else {
- keyT = tf.newTuple(1);
- keyT.set(0,idxTuple.get(0));
- }
- // number of reducers
- Integer cnt = maxIndex - minIndex;
- // 1 is added to account for the 0 index
- reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+ reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf);
inited = true;
}
}
Added: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java Mon May 11 20:34:48 2015
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.util.Pair;
+
+import com.google.common.collect.Maps;
+
+public class TezRuntimeUtil {
+ private static final Log LOG = LogFactory.getLog(TezRuntimeUtil.class);
+ public static Map<Object, Pair<Integer, Integer>> readReduceMapFromSample(TupleFactory tf) {
+ Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+ Map<String, Object> distMap = null;
+ if (PigProcessor.sampleMap != null) {
+ // We've collected sampleMap in PigProcessor
+ distMap = PigProcessor.sampleMap;
+ } else {
+ LOG.info("Key distribution map is empty");
+ return reducerMap;
+ }
+
+ long start = System.currentTimeMillis();
+
+ try {
+ reducerMap = Maps.newHashMap();
+ // The distMap is structured as (key, min, max) where min, max
+ // being the index of the reducers
+ DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ int totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ Iterator<Tuple> it = partitionList.iterator();
+ while (it.hasNext()) {
+ Tuple idxTuple = it.next();
+ Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+ Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+ // Used to replace the maxIndex with the number of reducers
+ if (maxIndex < minIndex) {
+ maxIndex = totalReducers + maxIndex;
+ }
+
+ Tuple keyT;
+ // if the join is on more than 1 key
+ if (idxTuple.size() > 3) {
+ // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+ // it in the reducer map
+ Tuple keyTuple = tf.newTuple();
+ for (int i=0; i < idxTuple.size() - 2; i++) {
+ keyTuple.append(idxTuple.get(i));
+ }
+ keyT = keyTuple;
+ } else {
+ keyT = tf.newTuple(1);
+ keyT.set(0,idxTuple.get(0));
+ }
+ // number of reducers
+ Integer cnt = maxIndex - minIndex;
+ // 1 is added to account for the 0 index
+ reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ LOG.info("Initialized reducerMap. Time taken: " + (System.currentTimeMillis() - start));
+ return reducerMap;
+ }
+}
Added: pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java Mon May 11 20:34:48 2015
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
+
+public class IsFirstReduceOfKey extends EvalFunc<Boolean> {
+ protected static final TupleFactory tf = TupleFactory.getInstance();
+ protected Map<Object, Pair<Integer, Integer> > reducerMap = null;
+ /* Loads the key distribution file obtained from the sampler */
+ protected void init() {
+ Configuration conf = PigMapReduce.sJobConfInternal.get();
+ String keyDistFile = conf.get("pig.keyDistFile", "");
+ if (keyDistFile.length() == 0) {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " used but no key distribution found");
+ }
+
+ try {
+ Integer [] redCnt = new Integer[1];
+ reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+ keyDistFile, redCnt, DataType.TUPLE, conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Boolean exec(Tuple input) throws IOException {
+ if (reducerMap == null) {
+ init();
+ }
+ Object key = input.get(0);
+ Tuple keyTuple = tf.newTuple(1);
+ keyTuple.set(0, key);
+ if (!reducerMap.containsKey(keyTuple)) {
+ return false;
+ }
+ int firstReducerOfKey = reducerMap.get(keyTuple).first;
+ int reduceIndex = UDFContext.getUDFContext().getJobConf().getInt(PigConstants.TASK_INDEX, 0);
+ if (firstReducerOfKey == reduceIndex) {
+ return true;
+ }
+ return false;
+ }
+
+}
Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Mon May 11 20:34:48 2015
@@ -64,6 +64,9 @@ public class PoissonSampleLoader extends
private int sampleRate = DEFAULT_SAMPLE_RATE;
+ // total memory in bytes
+ private long totalMemory;
+
private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
// new Sample tuple
@@ -89,7 +92,8 @@ public class PoissonSampleLoader extends
if(t == null) {
return createNumRowTuple(null);
}
- long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+ long availRedMem = (long) ( totalMemory * heapPerc);
+ // availRedMem = 155084396;
memToSkipPerSample = availRedMem/sampleRate;
updateSkipInterval(t);
@@ -175,6 +179,10 @@ public class PoissonSampleLoader extends
sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+ totalMemory = conf.getLong(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1L);
+ if (totalMemory == -1) {
+ totalMemory = Runtime.getRuntime().maxMemory();
+ }
}
}
Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java Mon May 11 20:34:48 2015
@@ -21,10 +21,11 @@ package org.apache.pig.impl.util;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
@@ -47,7 +48,8 @@ import org.apache.pig.impl.plan.PlanExce
*/
public class CompilerUtils {
- public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+ public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema,
+ boolean skewedJoin, String isFirstReduceOfKeyClassName) throws PlanException {
// we currently have POProject[bag] as the only operator in the plan
// If the bag is an empty bag, we should replace
// it with a bag with one tuple with null fields so that when we flatten
@@ -62,6 +64,18 @@ public class CompilerUtils {
// \ | POProject[Bag]
// \ | /
// POBinCond
+ // Further, if it is skewed join, only the first reduce of the key
+ // will generate tuple with null fields (See PIG-4377)
+ //
+ // POProject[key] POProject[Bag]
+ // \ /
+ // IsFirstReduceOfKey POUserFunc["IsEmpty()"]
+ // \ /
+ // \ /
+ // AND Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
+ // \ | /
+ // POBinCond
POProject relationProject = (POProject) fePlan.getRoots().get(0);
try {
@@ -76,6 +90,35 @@ public class CompilerUtils {
isEmpty.setResultType(DataType.BOOLEAN);
fePlan.add(isEmpty);
fePlan.connect(relationProjectForIsEmpty, isEmpty);
+
+ ExpressionOperator cond;
+ if (skewedJoin) {
+ POProject projectForKey = new POProject(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ projectForKey.setColumn(0);
+ projectForKey.setOverloaded(false);
+ projectForKey.setResultType(inputSchema.getField(0).type);
+
+ POAnd and = new POAnd(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+ getNextNodeId(scope)));
+ FuncSpec isFirstReduceOfKeySpec = new FuncSpec(isFirstReduceOfKeyClassName);
+ Object f1 = PigContext.instantiateFuncFromSpec(isFirstReduceOfKeySpec);
+ POUserFunc isFirstReduceOfKey = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+ getNextNodeId(scope)), -1, null, isFirstReduceOfKeySpec, (EvalFunc) f1);
+
+ fePlan.add(projectForKey);
+ fePlan.add(isFirstReduceOfKey);
+ fePlan.add(and);
+
+ fePlan.connect(projectForKey, isFirstReduceOfKey);
+ fePlan.connect(isFirstReduceOfKey, and);
+ fePlan.connect(isEmpty, and);
+ and.setLhs(isFirstReduceOfKey);
+ and.setRhs(isEmpty);
+
+ cond = and;
+ } else {
+ cond = isEmpty;
+ }
// lhs of bincond (const bag with null fields)
ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
@@ -98,13 +141,13 @@ public class CompilerUtils {
// let's set up the bincond now
POBinCond bincond = new POBinCond(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- bincond.setCond(isEmpty);
+ bincond.setCond(cond);
bincond.setLhs(ce);
bincond.setRhs(relationProject);
bincond.setResultType(DataType.BAG);
fePlan.add(bincond);
- fePlan.connect(isEmpty, bincond);
+ fePlan.connect(cond, bincond);
fePlan.connect(ce, bincond);
fePlan.connect(relationProject, bincond);
Modified: pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon May 11 20:34:48 2015
@@ -1888,7 +1888,7 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
- CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema));
+ CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema), false, null);
}
Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Mon May 11 20:34:48 2015
@@ -3068,6 +3068,20 @@ e = join a by name full outer, b by name
store e into ':OUTPATH:';\,
},
+ # right outer join with fixed memory
+ {
+ 'num' => 11,
+ 'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name ;
+store e into ':OUTPATH:';\,
+
+ },
]
},