You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sz...@apache.org on 2017/08/09 08:41:46 UTC

svn commit: r1804497 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java

Author: szita
Date: Wed Aug  9 08:41:45 2017
New Revision: 1804497

URL: http://svn.apache.org/viewvc?rev=1804497&view=rev
Log:
PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1804497&r1=1804496&r2=1804497&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug  9 08:41:45 2017
@@ -40,6 +40,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
+
 PIG-5284: Fix flakyness introduced by PIG-3655 (szita)
 
 PIG-5278: Unit test failures because of PIG-5264 (nkollar via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1804497&r1=1804496&r2=1804497&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java Wed Aug  9 08:41:45 2017
@@ -22,16 +22,21 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 
+import com.google.common.collect.Lists;
+
 /**
  * Wrapper class for PigSplits in Spark mode
  *
@@ -124,11 +129,13 @@ public interface SparkPigSplit extends W
 
        @Override
        public void readFields(DataInput is) throws IOException {
+           this.getConf().readFields(is);
            pigSplit.readFields(is);
        }
 
        @Override
        public void write(DataOutput os) throws IOException {
+           SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os);
            pigSplit.write(os);
        }
 
@@ -242,11 +249,13 @@ public interface SparkPigSplit extends W
 
         @Override
         public void readFields(DataInput is) throws IOException {
+            this.getConf().readFields(is);
             pigSplit.readFields(is);
         }
 
         @Override
         public void write(DataOutput os) throws IOException {
+            SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os);
             pigSplit.write(os);
         }
 
@@ -301,4 +310,32 @@ public interface SparkPigSplit extends W
         }
     }
 
+    public static class SparkPigSplitsUtils {
+
+        private static final List<String> PIGSPLIT_CONFIG_KEYS = Lists.newArrayList(
+                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+                PigConfiguration.PIG_COMPRESS_INPUT_SPLITS
+        );
+
+        /**
+         * Writes a subset of the originalConf into the output stream os. Only keys in PIG_SPLIT_CONFIG_KEYS are
+         * considered due to optimization purposes. During deseralization on a Spark executor we need to take care of
+         * setting the configuration manually because Spark only sets an empty Configuration instance on the PigSplit.
+         * @param originalConf
+         * @param os
+         * @throws IOException
+         */
+        public static void writeConfigForPigSplits(Configuration originalConf, DataOutput os) throws IOException {
+            Configuration conf = new Configuration(false);
+            for (String key : PIGSPLIT_CONFIG_KEYS) {
+                String value = originalConf.get(key);
+                if (value != null) {
+                    conf.set(key, value);
+                }
+            }
+            conf.write(os);
+        }
+
+    }
+
 }