You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2015/05/15 08:15:00 UTC

[jira] [Created] (PIG-4553) Implement secondary sort using 1 shuffle not twice

liyunzhang_intel created PIG-4553:
-------------------------------------

             Summary: Implement secondary sort using 1 shuffle not twice
                 Key: PIG-4553
                 URL: https://issues.apache.org/jira/browse/PIG-4553
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel


Now we implement secondary key sort in 

GlobalRearrangeConverter#convert
first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
{code}
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                              POGlobalRearrangeSpark physicalOperator) throws IOException {
....
  if (predecessors.size() == 1) {
            // GROUP
            JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
            if (physicalOperator.isUseSecondaryKey()) {
                RDD<Tuple> rdd = predecessors.get(0);
                RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
                        SparkUtil.<Tuple, Object>getTuple2Manifest());

                JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
                        SparkUtil.getManifest(Tuple.class),
                        SparkUtil.getManifest(Object.class));

                //first sort the tuple by secondary key if enable useSecondaryKey sort
                JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  // first shuffle 
                JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
                prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);// second shuffle
            } else {
                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
                prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
            }

            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
            return jrdd2.rdd();
        } 
....
}
{code}
we can optimize it according to the code from https://github.com/tresata/spark-sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)