You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2015/05/29 03:17:17 UTC

[jira] [Created] (PIG-4577) Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java

liyunzhang_intel created PIG-4577:
-------------------------------------

             Summary: Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java
                 Key: PIG-4577
                 URL: https://issues.apache.org/jira/browse/PIG-4577
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel
            Assignee: liyunzhang_intel


In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case)
in PIG-4565_2.patch:
GlobalRearrangeConverter.java
{code}
 @Override
    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                              POGlobalRearrangeSpark op) throws IOException {
        SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
                op, 0);
        int parallelism = SparkUtil.getParallelism(predecessors,
                op);

//         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
//         vs using groupBy (like we do in this commented code), vs using
//         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
//         Once we figure that out, we can allow custom partitioning for
//         secondary sort case as well.
//        if (predecessors.size() == 1) {
//            // GROUP BY
//            JavaPairRDD<Object, Iterable<Tuple>> prdd;
//            if (op.isUseSecondaryKey()) {
//                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
//            } else {
//                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
//                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
//                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
//                        parallelism));
//            }
//            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
//            return jrdd2.rdd();
//
//        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
//            return handleSecondarySort(predecessors.get(0), op, parallelism);
//        }

        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
            return handleSecondarySort(predecessors.get(0), op, parallelism);
        }

        List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
        for (RDD<Tuple> rdd : predecessors) {
            JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
            JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
            rddPairs.add(rddPair.rdd());
        }

        // Something's wrong with the type parameters of CoGroupedRDD
        // key and value are the same type ???
        CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
                (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
                        .asScalaBuffer(rddPairs).toSeq()),
                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
        );

        RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
            (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
        return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
    }
{code}

Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)