You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/11 07:23:33 UTC

svn commit: r1723978 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apache/pig/d...

Author: rohini
Date: Mon Jan 11 06:23:33 2016
New Revision: 1723978

URL: http://svn.apache.org/viewvc?rev=1723978&view=rev
Log:
PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after union does ascending instead (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
    pig/trunk/src/org/apache/pig/data/BinInterSedes.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 06:23:33 2016
@@ -79,6 +79,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after union does ascending instead (rohini)
+
 PIG-4774: Fix NPE in SUM,AVG,MIN,MAX UDFs for null bag input (rohini)
 
 PIG-4757: Job stats on successfully read/output records wrong with multiple inputs/outputs (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Mon Jan 11 06:23:33 2016
@@ -17,19 +17,18 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleRawComparator;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 
 public class PigSecondaryKeyComparator extends WritableComparator implements Configurable {
-    private final Log mLog = LogFactory.getLog(getClass());
+
     private TupleRawComparator mComparator=null;
 
     @Override
@@ -54,6 +53,7 @@ public class PigSecondaryKeyComparator e
         return null;
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 
         // the last byte of a NullableTuple is its Index
@@ -84,6 +84,47 @@ public class PigSecondaryKeyComparator e
                 rc = -1;
             else
                 rc = 1;
+        }
+        return rc;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compare(WritableComparable a, WritableComparable b)
+    {
+        PigNullableWritable wa = (PigNullableWritable)a;
+        PigNullableWritable wb = (PigNullableWritable)b;
+
+        if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a multi-query index
+            if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace))
+                return -1;
+            else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace))
+                return 1;
+            // If equal, we fall through
+        }
+
+        int rc = 0;
+        // If either are null, handle differently.
+        if (!wa.isNull() && !wb.isNull()) {
+            rc = mComparator.compare((Tuple) wa.getValueAsPigType(), (Tuple) wb.getValueAsPigType());
+            // handle PIG-927
+            // if tuples are equal but any field inside tuple is null, then we do not merge keys
+            if (rc == 0 && mComparator.hasComparedTupleNull())
+                rc = (wa.getIndex() & PigNullableWritable.idxSpace) - (wb.getIndex() & PigNullableWritable.idxSpace);
+        } else {
+            // Two nulls are equal if indices are same
+            if (wa.isNull() && wb.isNull()) {
+                if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace))
+                    rc = -1;
+                else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace))
+                    rc = 1;
+                else
+                    rc = 0;
+            }
+            else if (wa.isNull())
+                rc = -1;
+            else
+                rc = 1;
         }
         return rc;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jan 11 06:23:33 2016
@@ -584,6 +584,7 @@ public class TezDagBuilder extends TezOp
             // the inputs that are attached to the POShuffleTezLoad in the
             // backend.
             Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
+            TezOperator from = null;
             for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
                 if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
                     // skip sample vertex input
@@ -603,6 +604,7 @@ public class TezDagBuilder extends TezOp
                             if (isVertexGroup) {
                                 isMergedInput = true;
                             }
+                            from = pred;
                         }
                     }
                 }
@@ -619,6 +621,16 @@ public class TezDagBuilder extends TezOp
 
             //POShuffleTezLoad accesses the comparator setting
             selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
+
+            if (tezOp.isUseSecondaryKey()) {
+                TezEdgeDescriptor edge = tezOp.inEdges.get(from.getOperatorKey());
+                // Currently only PigSecondaryKeyGroupingComparator is used in POShuffleTezLoad.
+                // When PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup is fixed
+                // in future, PigSecondaryKeyComparator will have to be used and that will require this.
+                payloadConf.set("pig.secondarySortOrder", ObjectSerializer
+                        .serialize(edge.getSecondarySortOrder()));
+            }
+
         }
 
         // set parent plan in all operators. currently the parent plan is really

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Mon Jan 11 06:23:33 2016
@@ -56,12 +56,13 @@ public class SecondaryKeyOptimizerTez ex
             return;
         }
 
+        // TODO: PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup
         // Current code does not handle more than one predecessors
         // even though it is possible. The problem is when we
         // process the first predecessor, we remove the foreach inner
         // operators from the reduce side, and the second predecessor
         // cannot see them
-        if (predecessors.size()>1) {
+        if (predecessors.size() > 1) {
             return;
         }
         TezOperator from = predecessors.get(0);

Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Mon Jan 11 06:23:33 2016
@@ -1072,7 +1072,7 @@ public class BinInterSedes implements In
                         // we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
                         // we have to deal with them separately. We delegate it to the first invocation of
                         // compareDatum()
-                        assert (tsz1 == 3); // main_key, secondary_key, value
+                        assert (tsz1 == 2); // main_key, secondary_key
                         result = compareDatum(t1.get(0), t2.get(0), mAsc);
                         if (result == 0)
                             result = compareDatum(t1.get(1), t2.get(1), mSecondaryAsc);

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jan 11 06:23:33 2016
@@ -1424,8 +1424,11 @@ store e into ':OUTPATH:';\,
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
 c = union a, b;
 d = group c by name;
-e = foreach d { f = order c by $1,$2; generate group, f; };
-store e into ':OUTPATH:';\,
+d1 = group c by name; -- Two separate groupbys to ensure secondary key partitioner
+e = foreach d { f = order c by age, gpa ; g = limit f 1; generate g; };
+h = foreach d1 { i = order c by age asc, gpa desc; j = limit i 1; generate j; };
+store e into ':OUTPATH:.1';
+store h into ':OUTPATH:.2';\,
             },
             {
             # Union + Orderby