You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sz...@apache.org on 2017/08/13 16:09:32 UTC

svn commit: r1804929 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/data/

Author: szita
Date: Sun Aug 13 16:09:32 2017
New Revision: 1804929

URL: http://svn.apache.org/viewvc?rev=1804929&view=rev
Log:
PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita)

Added:
    pig/trunk/src/org/apache/pig/data/NonWritableTuple.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1804929&r1=1804928&r2=1804929&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Aug 13 16:09:32 2017
@@ -44,6 +44,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita)
+
 PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
 
 PIG-5284: Fix flakyness introduced by PIG-3655 (szita)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1804929&r1=1804928&r2=1804929&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Sun Aug 13 16:09:32 2017
@@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.NonWritableTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -141,7 +142,9 @@ public class PigOutputFormat extends Out
         public void write(WritableComparable key, Tuple value)
                 throws IOException, InterruptedException {
             if(mode == Mode.SINGLE_STORE) {
-                storeDecorator.putNext(value);
+                if (!(value instanceof NonWritableTuple)) {
+                    storeDecorator.putNext(value);
+                }
             } else {
                 throw new IOException("Internal Error: Unexpected code path");
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1804929&r1=1804928&r2=1804929&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Sun Aug 13 16:09:32 2017
@@ -34,6 +34,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.data.NonWritableTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -209,7 +210,7 @@ public class JoinGroupSparkConverter imp
                         out = (Tuple) result.result;
                         break;
                     case POStatus.STATUS_NULL:
-                        out = null;
+                        out = NonWritableTuple.INSTANCE;
                         break;
                     default:
                         throw new RuntimeException(

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java?rev=1804929&r1=1804928&r2=1804929&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java Sun Aug 13 16:09:32 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.NonWritableTuple;
 import org.apache.pig.data.Tuple;
 
 abstract class OutputConsumerIterator implements java.util.Iterator<Tuple> {
@@ -59,6 +60,9 @@ abstract class OutputConsumerIterator im
                         return;
                     }
                     Tuple v1 = input.next();
+                    if (v1 instanceof NonWritableTuple) {
+                        v1 = null;
+                    }
                     attach(v1);
                 }
 

Added: pig/trunk/src/org/apache/pig/data/NonWritableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/NonWritableTuple.java?rev=1804929&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/NonWritableTuple.java (added)
+++ pig/trunk/src/org/apache/pig/data/NonWritableTuple.java Sun Aug 13 16:09:32 2017
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * A singleton Tuple type which is not picked up for writing by PigRecordWriter
+ */
+public class NonWritableTuple extends AbstractTuple {
+
+
+    public static final NonWritableTuple INSTANCE = new NonWritableTuple();
+
+    private NonWritableTuple(){}
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    @Override
+    public Object get(int fieldNum) throws ExecException {
+        return null;
+    }
+
+    @Override
+    public List<Object> getAll() {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public void set(int fieldNum, Object val) throws ExecException {
+        throw new ExecException("Unimplemented");
+    }
+
+    @Override
+    public void append(Object val) {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public long getMemorySize() {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        throw new IOException("Unimplemented");
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        throw new IOException("Unimplemented");
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        throw new RuntimeException("Unimplemented");
+    }
+
+}