You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/29 15:16:24 UTC
svn commit: r1682452 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
./ converter/ plan/
Author: xuefu
Date: Fri May 29 13:16:24 2015
New Revision: 1682452
URL: http://svn.apache.org/r1682452
Log:
PIG-4565: Support custom MR partitioners for Spark engine (Mohit via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1682452&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java Fri May 29 13:16:24 2015
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+
+/**
+ * Spark Partitioner that wraps a custom partitioner that implements
+ * org.apache.hadoop.mapreduce.Partitioner interface.
+ *
+ * Since Spark's shuffle API takes a different parititioner class
+ * (@see org.apache.spark.Partitioner) compared to MapReduce, we need to
+ * wrap custom partitioners written for MapReduce inside this Spark Partitioner.
+ *
+ * MR Custom partitioners are expected to implement getPartition() with
+ * specific arguments:
+ * public int getPartition(PigNullableWritable key, Writable value, int numPartitions)
+ * For an example of such a partitioner,
+ * @see org.apache.pig.test.utils.SimpleCustomPartitioner
+ */
+public class MapReducePartitionerWrapper extends Partitioner {
+ private static final Log LOG = LogFactory.getLog(MapReducePartitionerWrapper.class);
+
+ private int numPartitions;
+ private String partitionerName;
+ // MR's Partitioner interface is not serializable.
+ // And since it is not serializable, it cannot be initialized in the constructor
+ // (in Spark's DAG scheduler thread in Spark Driver),
+ // To workaround this, It will be lazily initialized inside the map task
+ // (Executor thread) first time that getPartitions() gets called.
+ transient private org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>
+ mapredPartitioner = null;
+ transient private Method getPartitionMethod = null;
+
+ public MapReducePartitionerWrapper(String partitionerName,
+ int numPartitions) {
+ if (partitionerName == null) {
+ throw new RuntimeException("MapReduce Partitioner cannot be null.");
+ }
+
+ this.partitionerName = partitionerName;
+ this.numPartitions = numPartitions;
+ }
+
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ public int getPartition(final Object key) {
+ try {
+
+ PigNullableWritable writeableKey = new PigNullableWritable() {
+ public Object getValueAsPigType() {
+ return key;
+ }
+ };
+
+ // Lazy initialization
+ // Synchronized because multiple (map) tasks in the same Spark Executor
+ // may call getPartition, attempting to initialize at the same time.
+ if (mapredPartitioner == null) {
+ synchronized (this) {
+ // check again for race condition
+ if (mapredPartitioner == null) {
+ Class<?> mapredPartitionerClass =
+ PigContext.resolveClassName(partitionerName);
+ Configuration conf = new Configuration();
+ mapredPartitioner = (org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>)
+ ReflectionUtils.newInstance(mapredPartitionerClass, conf);
+ getPartitionMethod = mapredPartitionerClass.getMethod(
+ "getPartition",
+ PigNullableWritable.class,
+ org.apache.hadoop.io.Writable.class,
+ int.class);
+ }
+ }
+ }
+
+ // MR Parititioner getPartition takes a value argument as well, but
+ // Spark's Partitioner only accepts the key argument.
+ // In practice, MR Partitioners ignore the value. However, it's
+ // possible that some don't.
+ // TODO: We could handle this case by packaging the value inside the
+ // key (conditioned on some config option, since this will balloon
+ // memory usage). PIG-4575.
+ int partition = (Integer) getPartitionMethod.invoke(mapredPartitioner,
+ writeableKey, null, numPartitions);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("MapReduce Partitioner partition number for key " + key +
+ " is " + partition);
+ return partition;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean equals(Object other) {
+ boolean var4;
+ if(other instanceof MapReducePartitionerWrapper) {
+ MapReducePartitionerWrapper var3 = (MapReducePartitionerWrapper)other;
+ var4 = var3.numPartitions() == this.numPartitions();
+ } else {
+ var4 = false;
+ }
+
+ return var4;
+ }
+
+ public int hashCode() {
+ return this.numPartitions();
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Fri May 29 13:16:24 2015
@@ -19,24 +19,25 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
import org.apache.spark.rdd.RDD;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
public class SparkUtil {
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -88,12 +89,27 @@ public class SparkUtil {
public static int getParallelism(List<RDD<Tuple>> predecessors,
PhysicalOperator physicalOperator) {
- int parallelism = physicalOperator.getRequestedParallelism();
- if (parallelism <= 0) {
- // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
- // is reasonable.
- parallelism = predecessors.get(0).context().defaultParallelism();
- }
- return parallelism;
+
+ String numReducers = System.getenv("SPARK_REDUCERS");
+ if (numReducers != null) {
+ return Integer.parseInt(numReducers);
+ }
+
+ int parallelism = physicalOperator.getRequestedParallelism();
+ if (parallelism <= 0) {
+ // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
+ // is reasonable.
+ parallelism = predecessors.get(0).context().defaultParallelism();
+ }
+
+ return parallelism;
+ }
+
+ public static Partitioner getPartitioner(String customPartitioner, int parallelism) {
+ if (customPartitioner == null) {
+ return new HashPartitioner(parallelism);
+ } else {
+ return new MapReducePartitionerWrapper(customPartitioner, parallelism);
+ }
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Fri May 29 13:16:24 2015
@@ -29,10 +29,7 @@ import org.apache.pig.data.Tuple;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
-import scala.Function1;
-import scala.Function2;
import scala.Tuple2;
-import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
@@ -40,30 +37,30 @@ import scala.runtime.AbstractFunction2;
public class DistinctConverter implements POConverter<Tuple, Tuple, PODistinct> {
private static final Log LOG = LogFactory.getLog(DistinctConverter.class);
- private static final Function1<Tuple, Tuple2<Tuple, Object>> TO_KEY_VALUE_FUNCTION = new ToKeyValueFunction();
- private static final Function2<Object, Object, Object> MERGE_VALUES_FUNCTION = new MergeValuesFunction();
- private static final Function1<Tuple2<Tuple, Object>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- PODistinct poDistinct) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poDistinct, 1);
+ PODistinct op) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, op, 1);
RDD<Tuple> rdd = predecessors.get(0);
- ClassTag<Tuple2<Tuple, Object>> tuple2ClassManifest = SparkUtil
- .<Tuple, Object> getTuple2Manifest();
-
- RDD<Tuple2<Tuple, Object>> rddPairs = rdd.map(TO_KEY_VALUE_FUNCTION,
- tuple2ClassManifest);
+ // In DISTINCT operation, the key is the entire tuple.
+ // RDD<Tuple> -> RDD<Tuple2<Tuple, null>>
+ RDD<Tuple2<Tuple, Object>> keyValRDD = rdd.map(new ToKeyValueFunction(),
+ SparkUtil.<Tuple, Object> getTuple2Manifest());
PairRDDFunctions<Tuple, Object> pairRDDFunctions
- = new PairRDDFunctions<Tuple, Object>(
- rddPairs, SparkUtil.getManifest(Tuple.class),
+ = new PairRDDFunctions<Tuple, Object>(keyValRDD,
+ SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class), null);
- int parallelism = SparkUtil.getParallelism(predecessors, poDistinct);
- return pairRDDFunctions.reduceByKey(MERGE_VALUES_FUNCTION, parallelism)
- .map(TO_VALUE_FUNCTION, SparkUtil.getManifest(Tuple.class));
+ int parallelism = SparkUtil.getParallelism(predecessors, op);
+ return pairRDDFunctions.reduceByKey(
+ SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+ new MergeValuesFunction())
+ .map(new ToValueFunction(), SparkUtil.getManifest(Tuple.class));
}
+ /**
+ * Tuple -> Tuple2<Tuple, null>
+ */
private static final class ToKeyValueFunction extends
AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
Serializable {
@@ -71,16 +68,20 @@ public class DistinctConverter implement
public Tuple2<Tuple, Object> apply(Tuple t) {
if (LOG.isDebugEnabled())
LOG.debug("DistinctConverter.ToKeyValueFunction in " + t);
+
Tuple key = t;
- Object value = null; // value
- // (key, value)
+ Object value = null;
Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+
if (LOG.isDebugEnabled())
LOG.debug("DistinctConverter.ToKeyValueFunction out " + out);
return out;
}
}
+ /**
+ * No-op
+ */
private static final class MergeValuesFunction extends
AbstractFunction2<Object, Object, Object> implements Serializable {
@Override
@@ -89,6 +90,9 @@ public class DistinctConverter implement
}
}
+ /**
+ * Tuple2<Tuple, null> -> Tuple
+ */
private static final class ToValueFunction extends
AbstractFunction1<Tuple2<Tuple, Object>, Tuple> implements
Serializable {
@@ -97,4 +101,4 @@ public class DistinctConverter implement
return input._1;
}
}
-}
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 29 13:16:24 2015
@@ -29,14 +29,15 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -53,65 +54,84 @@ public class GlobalRearrangeConverter im
.getLog(GlobalRearrangeConverter.class);
private static final TupleFactory tf = TupleFactory.getInstance();
- @Override
+
+ @Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POGlobalRearrangeSpark physicalOperator) throws IOException {
+ POGlobalRearrangeSpark op) throws IOException {
SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
- physicalOperator, 0);
+ op, 0);
int parallelism = SparkUtil.getParallelism(predecessors,
- physicalOperator);
+ op);
- String reducers = System.getenv("SPARK_REDUCERS");
- if (reducers != null) {
- parallelism = Integer.parseInt(reducers);
- }
- LOG.info("Parallelism for Spark groupBy: " + parallelism);
-
- if (predecessors.size() == 1) {
- // GROUP
- JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
- if (physicalOperator.isUseSecondaryKey()) {
- RDD<Tuple> rdd = predecessors.get(0);
- RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
- SparkUtil.<Tuple, Object>getTuple2Manifest());
-
- JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
- SparkUtil.getManifest(Tuple.class),
- SparkUtil.getManifest(Object.class));
-
- //first sort the tuple by secondary key if enable useSecondaryKey sort
- JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
- JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
- prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);
- } else {
- JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
- prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
- }
+// TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
+// vs using groupBy (like we do in this commented code), vs using
+// reduceByKey(). This is a pending task in Pig on Spark Milestone 1
+// Once we figure that out, we can allow custom partitioning for
+// secondary sort case as well.
+// if (predecessors.size() == 1) {
+// // GROUP BY
+// JavaPairRDD<Object, Iterable<Tuple>> prdd;
+// if (op.isUseSecondaryKey()) {
+// prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
+// } else {
+// JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+// prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
+// prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
+// parallelism));
+// }
+// JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
+// return jrdd2.rdd();
+//
+// if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+// return handleSecondarySort(predecessors.get(0), op, parallelism);
+// }
+
+ if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+ return handleSecondarySort(predecessors.get(0), op, parallelism);
+ }
+
+ List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+ for (RDD<Tuple> rdd : predecessors) {
+ JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+ JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
+ rddPairs.add(rddPair.rdd());
+ }
+
+ // Something's wrong with the type parameters of CoGroupedRDD
+ // key and value are the same type ???
+ CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+ (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+ .asScalaBuffer(rddPairs).toSeq()),
+ SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
+ );
+
+ RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+ (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+ return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
+ }
- JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
- return jrdd2.rdd();
- } else {
- List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
- for (RDD<Tuple> rdd : predecessors) {
- JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
- JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
- rddPairs.add(rddPair.rdd());
- }
-
- // Something's wrong with the type parameters of CoGroupedRDD
- // key and value are the same type ???
- CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
- (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
- .asScalaBuffer(rddPairs).toSeq()),
- new HashPartitioner(parallelism));
-
- RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
- (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
- return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
- }
+ private RDD<Tuple> handleSecondarySort(
+ RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) {
+
+ RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+ SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+ JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+ SparkUtil.getManifest(Tuple.class),
+ SparkUtil.getManifest(Object.class));
+
+ //first sort the tuple by secondary key if enable useSecondaryKey sort
+ JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
+ new HashPartitioner(parallelism),
+ new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+ JavaRDD<Tuple> mapped = sorted.mapPartitions(new RemoveValueFunction());
+ JavaPairRDD<Object, Iterable<Tuple>> prdd = mapped.groupBy(
+ new GetKeyFunction(op), parallelism);
+ JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
+ return jrdd2.rdd();
}
- private static class ToValueFunction implements
+ private static class RemoveValueFunction implements
FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
@@ -145,15 +165,14 @@ public class GlobalRearrangeConverter im
@Override
public Tuple2<Tuple, Object> apply(Tuple t) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Sort ToKeyValueFunction in " + t);
+ LOG.debug("ToKeyNullValueFunction in " + t);
}
- Tuple key = t;
- Object value = null;
- // (key, value)
- Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+
+ Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("Sort ToKeyValueFunction out " + out);
+ LOG.debug("ToKeyNullValueFunction out " + out);
}
+
return out;
}
}
@@ -234,6 +253,9 @@ public class GlobalRearrangeConverter im
}
}
+ /**
+ * Function that extract keys from locally rearranged tuples.
+ */
private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
public final POGlobalRearrangeSpark glrSpark;
@@ -246,16 +268,18 @@ public class GlobalRearrangeConverter im
if (LOG.isDebugEnabled()) {
LOG.debug("GetKeyFunction in " + t);
}
- // see PigGenericMapReduce For the key
- Object key = null;
+
+ Object key;
if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
key = ((Tuple) t.get(1)).get(0);
} else {
key = t.get(1);
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("GetKeyFunction out " + key);
}
+
return key;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -263,8 +287,16 @@ public class GlobalRearrangeConverter im
}
}
- private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
- Serializable {
+ /**
+ * Function that converts elements of PairRDD to regular RDD.
+ * - Input (PairRDD) contains elements of the form
+ * Tuple2<key, Iterable<(index, key, value)>>.
+ * - Output (regular RDD) contains elements of the form
+ * Tuple<(key, iterator to (index, key, value))>
+ */
+ private static class GroupTupleFunction
+ implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
+ Serializable {
public final POGlobalRearrangeSpark glrSpark;
public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
@@ -276,12 +308,16 @@ public class GlobalRearrangeConverter im
if (LOG.isDebugEnabled()) {
LOG.debug("GroupTupleFunction in " + v1);
}
+
Tuple tuple = tf.newTuple(2);
- tuple.set(0, v1._1()); // the (index, key) tuple
- tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
+ tuple.set(0, v1._1()); // key
+ // Note that v1._2() is (index, key, value) tuple, and
+ // v1._2().iterator() is iterator to Seq<Tuple> (aka bag of values)
+ tuple.set(1, v1._2().iterator());
if (LOG.isDebugEnabled()) {
LOG.debug("GroupTupleFunction out " + tuple);
}
+
return tuple;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -290,7 +326,10 @@ public class GlobalRearrangeConverter im
}
/**
- * IndexedKey records the index and key info of a tuple.
+ * IndexedKey records the index and key info.
+ * This is used as key for JOINs. It addresses the case where key is
+ * either empty (or is a tuple with one or more emoty fields). In this case,
+ * we must respect the SQL standard as documented in the equals() method.
*/
public static class IndexedKey implements Serializable {
private byte index;
@@ -403,24 +442,27 @@ public class GlobalRearrangeConverter im
}
}
+ /**
+ * Converts incoming locally rearranged tuple, which is of the form
+ * (index, key, value) into Tuple2<key, value>
+ */
private static class ToKeyValueFunction implements
Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
@Override
public Tuple2<IndexedKey, Tuple> call(Tuple t) {
try {
- // (index, key, value)
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction in " + t);
}
- IndexedKey indexedKey = new IndexedKey((Byte) t.get(0), t.get(1));
- Tuple value = (Tuple) t.get(2); // value
- // (key, value)
- Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
- value);
+
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(
+ new IndexedKey((Byte) t.get(0), t.get(1)),
+ (Tuple) t.get(2));
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction out " + out);
}
+
return out;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -428,6 +470,11 @@ public class GlobalRearrangeConverter im
}
}
+ /**
+ * Converts cogroup output where each element is {key, bag[]} represented
+ * as Tuple2<Object, Seq<Seq<Tuple>>> into tuple of form
+ * (key, Iterator<(index, key, value)>)
+ */
private static class ToGroupKeyValueFunction implements
Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
@@ -435,15 +482,16 @@ public class GlobalRearrangeConverter im
public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("ToGroupKeyValueFunction2 in " + input);
+ LOG.debug("ToGroupKeyValueFunction in " + input);
}
+
final Object key = input._1().getKey();
Object obj = input._2();
// XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
int i = 0;
List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
- for (int j=0; j<bags.length; j++) {
+ for (int j = 0; j < bags.length; j ++) {
Seq<Tuple> bag = bags[j];
Iterator<Tuple> iterator = JavaConversions
.asJavaCollection(bag).iterator();
@@ -463,14 +511,16 @@ public class GlobalRearrangeConverter im
}
}
});
- ++i;
+ ++ i;
}
+
Tuple out = tf.newTuple(2);
out.set(0, key);
out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
if (LOG.isDebugEnabled()) {
- LOG.debug("ToGroupKeyValueFunction2 out " + out);
+ LOG.debug("ToGroupKeyValueFunction out " + out);
}
+
return out;
} catch (Exception e) {
throw new RuntimeException(e);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Fri May 29 13:16:24 2015
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -36,9 +35,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.rdd.RDD;
import scala.runtime.AbstractFunction1;
-import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
public class PackageConverter implements POConverter<Tuple, Tuple, POPackage> {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Fri May 29 13:16:24 2015
@@ -30,14 +30,11 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;