You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sz...@apache.org on 2017/08/09 08:41:46 UTC
svn commit: r1804497 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
Author: szita
Date: Wed Aug 9 08:41:45 2017
New Revision: 1804497
URL: http://svn.apache.org/viewvc?rev=1804497&view=rev
Log:
PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1804497&r1=1804496&r2=1804497&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 9 08:41:45 2017
@@ -40,6 +40,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
+
PIG-5284: Fix flakyness introduced by PIG-3655 (szita)
PIG-5278: Unit test failures because of PIG-5264 (nkollar via rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1804497&r1=1804496&r2=1804497&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java Wed Aug 9 08:41:45 2017
@@ -22,16 +22,21 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import com.google.common.collect.Lists;
+
/**
* Wrapper class for PigSplits in Spark mode
*
@@ -124,11 +129,13 @@ public interface SparkPigSplit extends W
@Override
public void readFields(DataInput is) throws IOException {
+ this.getConf().readFields(is);
pigSplit.readFields(is);
}
@Override
public void write(DataOutput os) throws IOException {
+ SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os);
pigSplit.write(os);
}
@@ -242,11 +249,13 @@ public interface SparkPigSplit extends W
@Override
public void readFields(DataInput is) throws IOException {
+ this.getConf().readFields(is);
pigSplit.readFields(is);
}
@Override
public void write(DataOutput os) throws IOException {
+ SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os);
pigSplit.write(os);
}
@@ -301,4 +310,32 @@ public interface SparkPigSplit extends W
}
}
+ public static class SparkPigSplitsUtils {
+
+ private static final List<String> PIGSPLIT_CONFIG_KEYS = Lists.newArrayList(
+ CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ PigConfiguration.PIG_COMPRESS_INPUT_SPLITS
+ );
+
+ /**
+ * Writes a subset of the originalConf into the output stream os. Only keys in PIG_SPLIT_CONFIG_KEYS are
+ * considered due to optimization purposes. During deseralization on a Spark executor we need to take care of
+ * setting the configuration manually because Spark only sets an empty Configuration instance on the PigSplit.
+ * @param originalConf
+ * @param os
+ * @throws IOException
+ */
+ public static void writeConfigForPigSplits(Configuration originalConf, DataOutput os) throws IOException {
+ Configuration conf = new Configuration(false);
+ for (String key : PIGSPLIT_CONFIG_KEYS) {
+ String value = originalConf.get(key);
+ if (value != null) {
+ conf.set(key, value);
+ }
+ }
+ conf.write(os);
+ }
+
+ }
+
}