You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/27 04:50:28 UTC
svn commit: r1792838 -
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
Author: zly
Date: Thu Apr 27 04:50:28 2017
New Revision: 1792838
URL: http://svn.apache.org/viewvc?rev=1792838&view=rev
Log:
PIG-5164:MultiQuery_Union_3 is failing with spark exec type(Adam via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1792838&r1=1792837&r2=1792838&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Thu Apr 27 04:50:28 2017
@@ -74,6 +74,7 @@ public class SecondaryKeySortUtil {
return new Iterable<Tuple>() {
Object curKey = null;
ArrayList curValues = new ArrayList();
+ boolean initialized = false;
@Override
public Iterator<Tuple> iterator() {
@@ -92,7 +93,11 @@ public class SecondaryKeySortUtil {
Object tMainKey = null;
try {
tMainKey = ((Tuple) (t._1()).getKey()).get(0);
- if (curKey != null && !curKey.equals(tMainKey)) {
+
+ //If the key has changed and we've seen at least 1 already
+ if (initialized &&
+ ((curKey == null && tMainKey != null) ||
+ (curKey != null && !curKey.equals(tMainKey)))){
Tuple result = restructTuple(curKey, new ArrayList(curValues));
curValues.clear();
curKey = tMainKey;
@@ -102,6 +107,7 @@ public class SecondaryKeySortUtil {
curKey = tMainKey;
//if key does not change, just append the value to the same key
curValues.add(t._2());
+ initialized = true;
} catch (ExecException e) {
throw new RuntimeException("AccumulateByKey throw exception: ", e);