You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/04/05 01:35:41 UTC

[jira] [Updated] (PIG-5205) Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction

     [ https://issues.apache.org/jira/browse/PIG-5205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liyunzhang_intel updated PIG-5205:
----------------------------------
    Attachment: PIG-5205.patch

all unit tests pass with PIG-5205.patch, now close it as it will be included with the patch of PIG-5197

> Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction
> -----------------------------------------------------------------------------
>
>                 Key: PIG-5205
>                 URL: https://issues.apache.org/jira/browse/PIG-5205
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5205.patch
>
>
> in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
> {code}
>    @Override
>         public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
>             try {
>            ....
>                 List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
>                 for (int j = 0; j < bags.length; j ++) {
>                     Seq<Tuple> bag = bags[j];
>                     Iterator<Tuple> iterator = JavaConversions
>                             .asJavaCollection(bag).iterator();
>                     final int index = i;
>                     tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
>                             iterator) {
>                         @Override
>                         protected Tuple transform(Tuple next) {
>                             try {
>                                 Tuple tuple = tf.newTuple(3);
>                                 tuple.set(0, index);
>                                # we record duplicate key info here
>                                 #for every records, we will use   out.set(0, key) later. may be the key info can be removed 
>                              tuple.set(1, key);   
>                                 tuple.set(2, next);
>                                 return tuple;
>                             } catch (ExecException e) {
>                                 throw new RuntimeException(e);
>                             }
>                         }
>                     });
>                     ++ i;
>                 }
>                 Tuple out = tf.newTuple(2);
>                 out.set(0, key);
>                 out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
>                 if (LOG.isDebugEnabled()) {
>                     LOG.debug("ToGroupKeyValueFunction out " + out);
>                 }
>                 return out;
>             } catch (Exception e) {
>                 throw new RuntimeException(e);
>             }
>         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)