You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/11 22:34:49 UTC

svn commit: r1678820 - in /pig/branches/branch-0.15: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hado...

Author: daijy
Date: Mon May 11 20:34:48 2015
New Revision: 1678820

URL: http://svn.apache.org/r1678820
Log:
PIG-4377: Skewed outer join produce wrong result in some cases

Added:
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java
    pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java
Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
    pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
    pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon May 11 20:34:48 2015
@@ -66,6 +66,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4377: Skewed outer join produce wrong result in some cases (daijy)
+
 PIG-4538: Pig script fail with CNF in follow up MR job (daijy)
 
 PIG-4537: Fix unit test failure introduced by TEZ-2392: TestCollectedGroup, TestLimitVariable,

Modified: pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/PigConfiguration.java Mon May 11 20:34:48 2015
@@ -97,6 +97,13 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEMUSAGE = "pig.skewedjoin.reduce.memusage";
 
     /**
+     * Memory available (in bytes) in reduce when calculating memory available for skewed join.
+     * By default, it is set to Runtime.getRuntime().maxMemory(). Override it only
+     * for debug purpose
+     */
+    public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon May 11 20:34:48 2015
@@ -93,6 +93,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.IsFirstReduceOfKey;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -1951,7 +1952,7 @@ public class MRCompiler extends PhyPlanV
                 eps.add(ep);
                 if (!inner[i]) {
                     // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
                 }
                 flat.add(true);
             }

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon May 11 20:34:48 2015
@@ -47,7 +47,7 @@ import com.google.common.collect.Maps;
 public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable {
     protected static final TupleFactory tf = TupleFactory.getInstance();
 
-    protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+    protected Map<Object, Pair<Integer, Integer>> reducerMap;
     protected Integer totalReducers = -1;
     protected boolean inited = false;
 

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Mon May 11 20:34:48 2015
@@ -63,10 +63,12 @@ public class POPoissonSample extends Phy
 
     private float heapPerc = 0f;
 
+    private long totalMemory = Runtime.getRuntime().maxMemory();
+
     // new Sample result
     private Result newSample = null;
 
-    public POPoissonSample(OperatorKey k, int rp, int sr, float hp) {
+    public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
         super(k, rp, null);
         numRowsSampled = 0;
         avgTupleMemSz = 0;
@@ -77,6 +79,9 @@ public class POPoissonSample extends Phy
         newSample = null;
         sampleRate = sr;
         heapPerc = hp;
+        if (tm != -1) {
+            totalMemory = tm;
+        }
     }
 
     @Override
@@ -115,7 +120,7 @@ public class POPoissonSample extends Phy
                 if (res.result == null) {
                     continue;
                 }
-                long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+                long availRedMem = (long) (totalMemory * heapPerc);
                 memToSkipPerSample = availRedMem/sampleRate;
                 updateSkipInterval((Tuple)res.result);
 

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Mon May 11 20:34:48 2015
@@ -95,6 +95,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
@@ -1477,8 +1478,12 @@ public class TezCompiler extends PhyPlan
             if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)) {
                 heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE));
             }
+            long totalMemory = -1;
+            if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)) {
+                totalMemory = Long.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM));
+            }
             POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
-                    -1, sampleRate, heapPerc);
+                    -1, sampleRate, heapPerc, totalMemory);
 
             TezOperator prevOp = compiledInputs[0];
             prevOp.plan.addAsLeaf(lrTez);
@@ -1645,7 +1650,7 @@ public class TezCompiler extends PhyPlan
                 eps.add(ep);
                 if (!inner[i]) {
                     // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
                 }
                 flat.add(true);
             }
@@ -1675,7 +1680,7 @@ public class TezCompiler extends PhyPlan
             TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
 
             POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
-            for (int i = 0; i < 2; i++) {
+            for (int i = 0; i <= 2; i++) {
                 joinJobs[i].setSampleOperator(sampleJobPair.first);
 
                 // Configure broadcast edges for distribution map
@@ -1684,8 +1689,10 @@ public class TezCompiler extends PhyPlan
                 sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
 
                 // Configure skewed partitioner for join
-                edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
-                edge.partitionerClass = SkewedPartitionerTez.class;
+                if (i != 2) {
+                    edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+                    edge.partitionerClass = SkewedPartitionerTez.class;
+                }
             }
 
             joinJobs[2].markSkewedJoin();

Added: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/IsFirstReduceOfKeyTez.java Mon May 11 20:34:48 2015
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.udf;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil;
+import org.apache.pig.impl.builtin.IsFirstReduceOfKey;
+
+public class IsFirstReduceOfKeyTez extends IsFirstReduceOfKey {
+    @Override
+    protected void init() {
+        reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf);
+    }
+}

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java Mon May 11 20:34:48 2015
@@ -17,73 +17,14 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil;
 
 public class SkewedPartitionerTez extends SkewedPartitioner {
-    private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class);
 
     @Override
     protected void init() {
-
-        Map<String, Object> distMap = null;
-        if (PigProcessor.sampleMap != null) {
-            // We've collected sampleMap in PigProcessor
-            distMap = PigProcessor.sampleMap;
-        } else {
-            LOG.info("Key distribution map is empty");
-            inited = true;
-            return;
-        }
-
-        long start = System.currentTimeMillis();
-
-        try {
-            // The distMap is structured as (key, min, max) where min, max
-            // being the index of the reducers
-            DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
-            totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
-            Iterator<Tuple> it = partitionList.iterator();
-            while (it.hasNext()) {
-                Tuple idxTuple = it.next();
-                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
-                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
-                // Used to replace the maxIndex with the number of reducers
-                if (maxIndex < minIndex) {
-                    maxIndex = totalReducers + maxIndex;
-                }
-
-                Tuple keyT;
-                // if the join is on more than 1 key
-                if (idxTuple.size() > 3) {
-                    // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
-                    // it in the reducer map
-                    Tuple keyTuple = tf.newTuple();
-                    for (int i=0; i < idxTuple.size() - 2; i++) {
-                        keyTuple.append(idxTuple.get(i));
-                    }
-                    keyT = keyTuple;
-                } else {
-                    keyT = tf.newTuple(1);
-                    keyT.set(0,idxTuple.get(0));
-                }
-                // number of reducers
-                Integer cnt = maxIndex - minIndex;
-                // 1 is added to account for the 0 index
-                reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+        reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf);
         inited = true;
     }
 }

Added: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezRuntimeUtil.java Mon May 11 20:34:48 2015
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.util.Pair;
+
+import com.google.common.collect.Maps;
+
+public class TezRuntimeUtil {
+    private static final Log LOG = LogFactory.getLog(TezRuntimeUtil.class);
+    public static Map<Object, Pair<Integer, Integer>> readReduceMapFromSample(TupleFactory tf) {
+        Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
+        Map<String, Object> distMap = null;
+        if (PigProcessor.sampleMap != null) {
+            // We've collected sampleMap in PigProcessor
+            distMap = PigProcessor.sampleMap;
+        } else {
+            LOG.info("Key distribution map is empty");
+            return reducerMap;
+        }
+
+        long start = System.currentTimeMillis();
+
+        try {
+            reducerMap = Maps.newHashMap();
+            // The distMap is structured as (key, min, max) where min, max
+            // being the index of the reducers
+            DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+            int totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+            Iterator<Tuple> it = partitionList.iterator();
+            while (it.hasNext()) {
+                Tuple idxTuple = it.next();
+                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
+                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
+                // Used to replace the maxIndex with the number of reducers
+                if (maxIndex < minIndex) {
+                    maxIndex = totalReducers + maxIndex;
+                }
+
+                Tuple keyT;
+                // if the join is on more than 1 key
+                if (idxTuple.size() > 3) {
+                    // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
+                    // it in the reducer map
+                    Tuple keyTuple = tf.newTuple();
+                    for (int i=0; i < idxTuple.size() - 2; i++) {
+                        keyTuple.append(idxTuple.get(i));
+                    }
+                    keyT = keyTuple;
+                } else {
+                    keyT = tf.newTuple(1);
+                    keyT.set(0,idxTuple.get(0));
+                }
+                // number of reducers
+                Integer cnt = maxIndex - minIndex;
+                // 1 is added to account for the 0 index
+                reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        LOG.info("Initialized reducerMap. Time taken: " + (System.currentTimeMillis() - start));
+        return reducerMap;
+    }
+}

Added: pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java?rev=1678820&view=auto
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java (added)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/IsFirstReduceOfKey.java Mon May 11 20:34:48 2015
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
+
+public class IsFirstReduceOfKey extends EvalFunc<Boolean> {
+    protected static final TupleFactory tf = TupleFactory.getInstance();
+    protected Map<Object, Pair<Integer, Integer> > reducerMap = null;
+    /* Loads the key distribution file obtained from the sampler */
+    protected void init() {
+        Configuration conf = PigMapReduce.sJobConfInternal.get();
+        String keyDistFile = conf.get("pig.keyDistFile", "");
+        if (keyDistFile.length() == 0) {
+            throw new RuntimeException(this.getClass().getSimpleName() +
+                    " used but no key distribution found");
+        }
+
+        try {
+            Integer [] redCnt = new Integer[1]; 
+            reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+                    keyDistFile, redCnt, DataType.TUPLE, conf);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Boolean exec(Tuple input) throws IOException {
+        if (reducerMap == null) {
+            init();
+        }
+        Object key = input.get(0);
+        Tuple keyTuple = tf.newTuple(1);
+        keyTuple.set(0, key);
+        if (!reducerMap.containsKey(keyTuple)) {
+            return false;
+        }
+        int firstReducerOfKey = reducerMap.get(keyTuple).first;
+        int reduceIndex = UDFContext.getUDFContext().getJobConf().getInt(PigConstants.TASK_INDEX, 0);
+        if (firstReducerOfKey == reduceIndex) {
+            return true;
+        }
+        return false;
+    }
+
+}

Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Mon May 11 20:34:48 2015
@@ -64,6 +64,9 @@ public class PoissonSampleLoader extends
 
     private int sampleRate = DEFAULT_SAMPLE_RATE;
 
+    // total memory in bytes
+    private long totalMemory;
+
     private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
 
     // new Sample tuple
@@ -89,7 +92,8 @@ public class PoissonSampleLoader extends
             if(t == null) {
                 return createNumRowTuple(null);
             }
-            long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+            long availRedMem = (long) ( totalMemory * heapPerc);
+            // availRedMem = 155084396;
             memToSkipPerSample = availRedMem/sampleRate;
             updateSkipInterval(t);
 
@@ -175,6 +179,10 @@ public class PoissonSampleLoader extends
         sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
         heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
                 PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        totalMemory = conf.getLong(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1L);
+        if (totalMemory == -1) {
+            totalMemory = Runtime.getRuntime().maxMemory();
+        }
     }
 
 }

Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java Mon May 11 20:34:48 2015
@@ -21,10 +21,11 @@ package org.apache.pig.impl.util;
 import java.util.ArrayList;
 import java.util.List;
 
-
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
@@ -47,7 +48,8 @@ import org.apache.pig.impl.plan.PlanExce
  */
 public class CompilerUtils {
 
-    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema,
+            boolean skewedJoin, String isFirstReduceOfKeyClassName) throws PlanException {
         // we currently have POProject[bag] as the only operator in the plan
         // If the bag is an empty bag, we should replace
         // it with a bag with one tuple with null fields so that when we flatten
@@ -62,6 +64,18 @@ public class CompilerUtils {
         //                        \      |    POProject[Bag]             
         //                         \     |    /
         //                          POBinCond
+        // Further, if it is skewed join, only the first reduce of the key
+        // will generate tuple with null fields (See PIG-4377)
+        // 
+        // POProject[key]              POProject[Bag]
+        //         \                      /
+        //      IsFirstReduceOfKey  POUserFunc["IsEmpty()"]
+        //                   \        /
+        //                    \      /
+        //                       AND  Const[Bag](bag with null fields)   
+        //                        \      |    POProject[Bag]             
+        //                         \     |    /
+        //                          POBinCond
         POProject relationProject = (POProject) fePlan.getRoots().get(0);
         try {
             
@@ -76,6 +90,35 @@ public class CompilerUtils {
             isEmpty.setResultType(DataType.BOOLEAN);
             fePlan.add(isEmpty);
             fePlan.connect(relationProjectForIsEmpty, isEmpty);
+
+            ExpressionOperator cond;
+            if (skewedJoin) {
+                POProject projectForKey = new POProject(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                projectForKey.setColumn(0);
+                projectForKey.setOverloaded(false);
+                projectForKey.setResultType(inputSchema.getField(0).type);
+
+                POAnd and = new POAnd(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)));
+                FuncSpec isFirstReduceOfKeySpec = new FuncSpec(isFirstReduceOfKeyClassName);
+                Object f1 = PigContext.instantiateFuncFromSpec(isFirstReduceOfKeySpec);
+                POUserFunc isFirstReduceOfKey = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+                            getNextNodeId(scope)), -1, null, isFirstReduceOfKeySpec, (EvalFunc) f1);
+
+                fePlan.add(projectForKey);
+                fePlan.add(isFirstReduceOfKey);
+                fePlan.add(and);
+
+                fePlan.connect(projectForKey, isFirstReduceOfKey);
+                fePlan.connect(isFirstReduceOfKey, and);
+                fePlan.connect(isEmpty, and);
+                and.setLhs(isFirstReduceOfKey);
+                and.setRhs(isEmpty);
+
+                cond = and;
+            } else {
+                cond = isEmpty;
+            }
             
             // lhs of bincond (const bag with null fields)
             ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
@@ -98,13 +141,13 @@ public class CompilerUtils {
             // let's set up the bincond now
             POBinCond bincond = new POBinCond(new OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            bincond.setCond(isEmpty);
+            bincond.setCond(cond);
             bincond.setLhs(ce);
             bincond.setRhs(relationProject);
             bincond.setResultType(DataType.BAG);
             fePlan.add(bincond);
 
-            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(cond, bincond);
             fePlan.connect(ce, bincond);
             fePlan.connect(relationProject, bincond);
 

Modified: pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon May 11 20:34:48 2015
@@ -1888,7 +1888,7 @@ public class LogToPhyTranslationVisitor
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
 
-        CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema));
+        CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema), false, null);
 
     }
 

Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1678820&r1=1678819&r2=1678820&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Mon May 11 20:34:48 2015
@@ -3068,6 +3068,20 @@ e = join a by name full outer, b by name
 store e into ':OUTPATH:';\,
 
                         },
+                # right outer join with fixed memory
+                        {
+                        'num' => 11,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name ;
+store e into ':OUTPATH:';\,
+
+                        },
                 ]
 
             },