You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Mohit Sabharwal (JIRA)" <ji...@apache.org> on 2015/06/03 06:06:50 UTC
[jira] [Commented] (PIG-4577) Use "cogroup" spark api to implement
"groupby+secondarysort" case in GlobalRearrangeConverter.java
[ https://issues.apache.org/jira/browse/PIG-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570244#comment-14570244 ]
Mohit Sabharwal commented on PIG-4577:
--------------------------------------
+1 (non-binding)
> Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java
> --------------------------------------------------------------------------------------------------
>
> Key: PIG-4577
> URL: https://issues.apache.org/jira/browse/PIG-4577
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4577.patch
>
>
> In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case)
> in PIG-4565_2.patch:
> GlobalRearrangeConverter.java
> {code}
> @Override
> public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
> POGlobalRearrangeSpark op) throws IOException {
> SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
> op, 0);
> int parallelism = SparkUtil.getParallelism(predecessors,
> op);
> // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
> // vs using groupBy (like we do in this commented code), vs using
> // reduceByKey(). This is a pending task in Pig on Spark Milestone 1
> // Once we figure that out, we can allow custom partitioning for
> // secondary sort case as well.
> // if (predecessors.size() == 1) {
> // // GROUP BY
> // JavaPairRDD<Object, Iterable<Tuple>> prdd;
> // if (op.isUseSecondaryKey()) {
> // prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
> // } else {
> // JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
> // prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
> // prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
> // parallelism));
> // }
> // JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
> // return jrdd2.rdd();
> //
> // if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
> // return handleSecondarySort(predecessors.get(0), op, parallelism);
> // }
> if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
> return handleSecondarySort(predecessors.get(0), op, parallelism);
> }
> List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
> for (RDD<Tuple> rdd : predecessors) {
> JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
> JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
> rddPairs.add(rddPair.rdd());
> }
> // Something's wrong with the type parameters of CoGroupedRDD
> // key and value are the same type ???
> CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
> (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
> .asScalaBuffer(rddPairs).toSeq()),
> SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
> );
> RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
> (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
> return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
> }
> {code}
> Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)