You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/04/05 01:35:41 UTC
[jira] [Updated] (PIG-5205) Duplicate record key info in
GlobalRearrangeConverter#ToGroupKeyValueFunction
[ https://issues.apache.org/jira/browse/PIG-5205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liyunzhang_intel updated PIG-5205:
----------------------------------
Attachment: PIG-5205.patch
all unit tests pass with PIG-5205.patch, now close it as it will be included with the patch of PIG-5197
> Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction
> -----------------------------------------------------------------------------
>
> Key: PIG-5205
> URL: https://issues.apache.org/jira/browse/PIG-5205
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5205.patch
>
>
> in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
> {code}
> @Override
> public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
> try {
> ....
> List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
> for (int j = 0; j < bags.length; j ++) {
> Seq<Tuple> bag = bags[j];
> Iterator<Tuple> iterator = JavaConversions
> .asJavaCollection(bag).iterator();
> final int index = i;
> tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
> iterator) {
> @Override
> protected Tuple transform(Tuple next) {
> try {
> Tuple tuple = tf.newTuple(3);
> tuple.set(0, index);
> # we record duplicate key info here
> #for every records, we will use out.set(0, key) later. may be the key info can be removed
> tuple.set(1, key);
> tuple.set(2, next);
> return tuple;
> } catch (ExecException e) {
> throw new RuntimeException(e);
> }
> }
> });
> ++ i;
> }
> Tuple out = tf.newTuple(2);
> out.set(0, key);
> out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
> if (LOG.isDebugEnabled()) {
> LOG.debug("ToGroupKeyValueFunction out " + out);
> }
> return out;
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)