You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/03/31 01:52:42 UTC

[jira] [Created] (PIG-5205) Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction

liyunzhang_intel created PIG-5205:
-------------------------------------

             Summary: Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction
                 Key: PIG-5205
                 URL: https://issues.apache.org/jira/browse/PIG-5205
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel
            Assignee: liyunzhang_intel


in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
{code}

   @Override
        public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
            try {
           ....
                List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
                for (int j = 0; j < bags.length; j ++) {
                    Seq<Tuple> bag = bags[j];
                    Iterator<Tuple> iterator = JavaConversions
                            .asJavaCollection(bag).iterator();
                    final int index = i;
                    tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
                            iterator) {
                        @Override
                        protected Tuple transform(Tuple next) {
                            try {
                                Tuple tuple = tf.newTuple(3);
                                tuple.set(0, index);
                               # we record duplicate key info here
                                #for every records, we will use   out.set(0, key) later. may be the key info can be removed 
                             tuple.set(1, key);   
                                tuple.set(2, next);
                                return tuple;
                            } catch (ExecException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                    ++ i;
                }

                Tuple out = tf.newTuple(2);
                out.set(0, key);
                out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("ToGroupKeyValueFunction out " + out);
                }

                return out;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)