You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/03/31 01:52:42 UTC
[jira] [Created] (PIG-5205) Duplicate record key info in
GlobalRearrangeConverter#ToGroupKeyValueFunction
liyunzhang_intel created PIG-5205:
-------------------------------------
Summary: Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction
Key: PIG-5205
URL: https://issues.apache.org/jira/browse/PIG-5205
Project: Pig
Issue Type: Sub-task
Reporter: liyunzhang_intel
Assignee: liyunzhang_intel
in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
{code}
@Override
public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
try {
....
List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
for (int j = 0; j < bags.length; j ++) {
Seq<Tuple> bag = bags[j];
Iterator<Tuple> iterator = JavaConversions
.asJavaCollection(bag).iterator();
final int index = i;
tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
iterator) {
@Override
protected Tuple transform(Tuple next) {
try {
Tuple tuple = tf.newTuple(3);
tuple.set(0, index);
# we record duplicate key info here
#for every records, we will use out.set(0, key) later. may be the key info can be removed
tuple.set(1, key);
tuple.set(2, next);
return tuple;
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
});
++ i;
}
Tuple out = tf.newTuple(2);
out.set(0, key);
out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
if (LOG.isDebugEnabled()) {
LOG.debug("ToGroupKeyValueFunction out " + out);
}
return out;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)