You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/02/22 03:03:42 UTC
tajo git commit: TAJO-2080: ArrayIndexOutOfBoundsException when
performing aggregation on an union block.
Repository: tajo
Updated Branches:
refs/heads/master 7a9a3ec47 -> 1138784fb
TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an union block.
Closes #967
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1138784f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1138784f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1138784f
Branch: refs/heads/master
Commit: 1138784fbd1e3e7b34ce616bd308250ef59ab859
Parents: 7a9a3ec
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Feb 22 11:03:10 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Feb 22 11:03:29 2016 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../tajo/catalog/statistics/StatisticsUtil.java | 31 +++++++++++++++++++-
.../tajo/engine/query/TestTableSubQuery.java | 16 +++++++++-
.../TestTableSubQuery/testGroupbyOnUnion.result | 6 ++++
.../physical/HashShuffleFileWriteExec.java | 4 +--
.../java/org/apache/tajo/querymaster/Query.java | 6 ++--
.../java/org/apache/tajo/querymaster/Stage.java | 16 +++++-----
7 files changed, 67 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b7b9e1d..21fd2b0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,9 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an
+ union block. (jihoon)
+
TAJO-2078: TestTajoCli.testSelectResultWithNullTrueDeprecated occasionally
fails. (Dongkyu Hwangbo via jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
index c481276..b3c18c5 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
@@ -18,10 +18,14 @@
package org.apache.tajo.catalog.statistics;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import java.util.ArrayList;
import java.util.List;
public class StatisticsUtil {
@@ -153,4 +157,29 @@ public class StatisticsUtil {
return aggregated;
}
-}
\ No newline at end of file
+
+ public static List<ColumnStats> aggregateColumnStats(List<ColumnStats> stats1, List<ColumnStats> stats2) {
+ Preconditions.checkState(stats1.size() == stats2.size());
+ List<ColumnStats> result = new ArrayList<>(stats1.size());
+ for (int i = 0; i < result.size(); i++) {
+ Preconditions.checkState(stats1.get(i).getColumn().getTypeDesc().equals(stats2.get(i).getColumn().getTypeDesc()));
+ ColumnStats resultStats = new ColumnStats(stats1.get(i).getColumn());
+ // TODO: resultStats.setNumDistValues();
+ resultStats.setNumNulls(stats1.get(i).getNumNulls() + stats2.get(i).getNumNulls());
+ resultStats.setMaxValue(stats1.get(i).getMaxValue().compareTo(stats2.get(i).getMaxValue()) > 0 ?
+ stats1.get(i).getMaxValue() : stats2.get(i).getMaxValue());
+ resultStats.setMinValue(stats1.get(i).getMinValue().compareTo(stats2.get(i).getMinValue()) < 0 ?
+ stats1.get(i).getMinValue() : stats2.get(i).getMinValue());
+ result.add(resultStats);
+ }
+ return result;
+ }
+
+ public static List<ColumnStats> emptyColumnStats(Schema schema) {
+ List<ColumnStats> stats = new ArrayList<>(schema.size());
+ for (Column column : schema.getRootColumns()) {
+ stats.add(new ColumnStats(column));
+ }
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
index f59bac7..8703834 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
@@ -109,4 +109,18 @@ public class TestTableSubQuery extends QueryTestCaseBase {
public void testMultipleSubqueriesWithAggregation() throws Exception {
runSimpleTests();
}
-}
\ No newline at end of file
+
+ @Test
+ @Option(sort = true)
+ @SimpleTest(
+ queries = @QuerySpec("" +
+ "select sum(t.cnt) as cnt, l_orderkey, l_partkey, 'my view' from (" +
+ "select l_orderkey, l_partkey, CAST(COUNT(1) AS INT4) as cnt from lineitem group by l_orderkey, l_partkey " +
+ "union all " +
+ "select l_orderkey, l_partkey, l_linenumber as cnt from lineitem) as t " +
+ "group by l_orderkey, l_partkey")
+ )
+ public void testGroupbyOnUnion() throws Exception {
+ runSimpleTests();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result b/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result
new file mode 100644
index 0000000..a462d7e
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result
@@ -0,0 +1,6 @@
+cnt,l_orderkey,l_partkey,?text
+-------------------------------
+2,3,2,my view
+2,2,2,my view
+5,1,1,my view
+3,3,3,my view
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 49b0e11..5563ab9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -170,7 +170,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
writtenBytes += usedBufferSize;
usedBufferSize = totalBufferCapacity = 0;
- TableStats aggregated = (TableStats) child.getInputStats().clone();
+ TableStats aggregated = new TableStats();
aggregated.setNumBytes(writtenBytes);
aggregated.setNumRows(numRows);
context.setResultStats(aggregated);
@@ -243,4 +243,4 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
progress = 1.0f;
super.close();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 0cd178f..e370a29 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -504,7 +504,7 @@ public class Query implements EventHandler<QueryEvent> {
query.context.getQueryContext(),
lastStage.getId(),
lastStage.getMasterPlan().getLogicalPlan(),
- lastStage.getSchema(),
+ lastStage.getOutSchema(),
tableDesc);
QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
@@ -653,7 +653,7 @@ public class Query implements EventHandler<QueryEvent> {
TableDesc resultTableDesc =
new TableDesc(
query.getId().toString(),
- lastStage.getSchema(),
+ lastStage.getOutSchema(),
meta,
finalOutputDir.toUri());
resultTableDesc.setExternal(true);
@@ -730,7 +730,7 @@ public class Query implements EventHandler<QueryEvent> {
finalTable = catalog.getTableDesc(tableName);
} else {
String tableName = query.getId().toString();
- finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
+ finalTable = new TableDesc(tableName, lastStage.getOutSchema(), meta, finalOutputDir.toUri());
}
long volume = getTableVolume(query.systemConf, finalOutputDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1138784f/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 85086e6..f1813c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.error.Errors.SerializedException;
import org.apache.tajo.exception.ErrorUtil;
import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -96,7 +95,7 @@ public class Stage implements EventHandler<StageEvent> {
private MasterPlan masterPlan;
private ExecutionBlock block;
private int priority;
- private Schema schema;
+ private Schema outSchema;
private TableMeta meta;
private TableStats resultStatistics;
private TableStats inputStatistics;
@@ -592,8 +591,8 @@ public class Stage implements EventHandler<StageEvent> {
return tasks.get(qid);
}
- public Schema getSchema() {
- return schema;
+ public Schema getOutSchema() {
+ return outSchema;
}
public TableMeta getTableMeta() {
@@ -667,8 +666,7 @@ public class Stage implements EventHandler<StageEvent> {
long[] numRows = new long[]{0, 0};
int[] numBlocks = new int[]{0, 0};
int[] numOutputs = new int[]{0, 0};
-
- List<ColumnStats> columnStatses = Lists.newArrayList();
+ List<ColumnStats> columnStatses = StatisticsUtil.emptyColumnStats(stage.getDataChannel().getSchema());
MasterPlan masterPlan = stage.getMasterPlan();
for (ExecutionBlock block : masterPlan.getChilds(stage.getBlock())) {
@@ -687,7 +685,9 @@ public class Stage implements EventHandler<StageEvent> {
numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
numRows[i] += childStatArray[i].getNumRows();
}
- columnStatses.addAll(childStatArray[1].getColumnStats());
+ if (childStatArray[1].getColumnStats() != null && childStatArray[1].getColumnStats().size() > 0) {
+ columnStatses = StatisticsUtil.aggregateColumnStats(columnStatses, childStatArray[1].getColumnStats());
+ }
}
for (int i = 0; i < 2; i++) {
@@ -794,7 +794,7 @@ public class Stage implements EventHandler<StageEvent> {
dataFormat = channel.getDataFormat();
}
- schema = channel.getSchema();
+ outSchema = channel.getSchema();
meta = CatalogUtil.newTableMeta(dataFormat, new KeyValueSet());
inputStatistics = statsArray[0];
resultStatistics = statsArray[1];