You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 07:02:07 UTC

svn commit: r1751220 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: MapReducePartitionerWrapper.java converter/IndexedKey.java

Author: praveen
Date: Mon Jul  4 07:02:07 2016
New Revision: 1751220

URL: http://svn.apache.org/viewvc?rev=1751220&view=rev
Log:
PIG-4936: Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins (Liyun via Praveen)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1751220&r1=1751219&r2=1751220&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java Mon Jul  4 07:02:07 2016
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.spark.Partitioner;
@@ -75,10 +76,17 @@ public class MapReducePartitionerWrapper
 
             PigNullableWritable writeableKey = new PigNullableWritable() {
                 public Object getValueAsPigType() {
-                    return key;
+                    if (key instanceof IndexedKey) {
+                        IndexedKey indexedKey = (IndexedKey) key;
+                        this.setIndex(indexedKey.getIndex());
+                        return indexedKey.getKey();
+                    } else {
+                        return key;
+                    }
                 }
             };
 
+
             // Lazy initialization
             // Synchronized because multiple (map) tasks in the same Spark Executor
             // may call getPartition, attempting to initialize at the same time.

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1751220&r1=1751219&r2=1751220&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Mon Jul  4 07:02:07 2016
@@ -37,6 +37,10 @@ public class IndexedKey implements Seria
         this.key = key;
     }
 
+    public byte getIndex() {
+        return index;
+    }
+
     public Object getKey() {
         return key;
     }