You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2015/06/01 03:22:17 UTC

[jira] [Updated] (PIG-4577) Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java

     [ https://issues.apache.org/jira/browse/PIG-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liyunzhang_intel updated PIG-4577:
----------------------------------
    Attachment: PIG-4577.patch

[~mohitsabharwal]:
 in PIG-4577.patch,changes are:
1. use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java

> Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java
> --------------------------------------------------------------------------------------------------
>
>                 Key: PIG-4577
>                 URL: https://issues.apache.org/jira/browse/PIG-4577
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4577.patch
>
>
> In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case)
> in PIG-4565_2.patch:
> GlobalRearrangeConverter.java
> {code}
>  @Override
>     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
>                               POGlobalRearrangeSpark op) throws IOException {
>         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
>                 op, 0);
>         int parallelism = SparkUtil.getParallelism(predecessors,
>                 op);
> //         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
> //         vs using groupBy (like we do in this commented code), vs using
> //         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
> //         Once we figure that out, we can allow custom partitioning for
> //         secondary sort case as well.
> //        if (predecessors.size() == 1) {
> //            // GROUP BY
> //            JavaPairRDD<Object, Iterable<Tuple>> prdd;
> //            if (op.isUseSecondaryKey()) {
> //                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
> //            } else {
> //                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
> //                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
> //                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
> //                        parallelism));
> //            }
> //            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
> //            return jrdd2.rdd();
> //
> //        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
> //            return handleSecondarySort(predecessors.get(0), op, parallelism);
> //        }
>         if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
>             return handleSecondarySort(predecessors.get(0), op, parallelism);
>         }
>         List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
>         for (RDD<Tuple> rdd : predecessors) {
>             JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
>             JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
>             rddPairs.add(rddPair.rdd());
>         }
>         // Something's wrong with the type parameters of CoGroupedRDD
>         // key and value are the same type ???
>         CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
>                 (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
>                         .asScalaBuffer(rddPairs).toSeq()),
>                 SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
>         );
>         RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
>             (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
>         return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
>     }
> {code}
> Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)