You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/01 07:58:59 UTC
[06/14] git commit: TAJO-974: Eliminate unexpected case condition in
SubQuery. (Hyoungjun Kim via hyunsik)
TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim via hyunsik)
Closes #974
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f6b5b38
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f6b5b38
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f6b5b38
Branch: refs/heads/index_support
Commit: 1f6b5b38752f499ee6d70ea1be399df34442b4f3
Parents: 9880f06
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Jul 28 11:19:54 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Jul 28 11:19:54 2014 +0900
----------------------------------------------------------------------
CHANGES | 7 +-
.../tajo/master/querymaster/Repartitioner.java | 11 +-
.../tajo/master/querymaster/SubQuery.java | 23 +++--
.../tajo/engine/query/TestGroupByQuery.java | 103 ++++++++++++++++++-
4 files changed, 131 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2e530af..3ac13a9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim
+ via hyunsik)
+
TAJO-977: INSERT into a partitioned table as SELECT statement uses a wrong
schema. (Hyoungjun Kim via hyunsik)
@@ -112,8 +115,8 @@ Release 0.9.0 - unreleased
TAJO-972: Broadcast join with left outer join returns duplicated rows.
(Hyoungjun Kim via jaehwa)
- TAJO-666: java.nio.BufferOverflowException occurs when the query includes an order by
- clause on a TEXT column. (Mai Hai Thanh via jihoon)
+ TAJO-666: java.nio.BufferOverflowException occurs when the query includes
+ an order by clause on a TEXT column. (Mai Hai Thanh via jihoon)
TAJO-939: Refactoring the column resolver in LogicalPlan. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 31c520f..6eebbde 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -726,12 +726,21 @@ public class Repartitioner {
}
}
+ int groupingColumns = 0;
GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
+ if (groupby != null) {
+ groupingColumns = groupby.getGroupingColumns().length;
+ } else {
+ DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+ if (dGroupby != null) {
+ groupingColumns = dGroupby.getGroupingColumns().length;
+ }
+ }
// get a proper number of tasks
int determinedTaskNum = Math.min(maxNum, finalFetches.size());
LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
- if (groupby != null && groupby.getGroupingColumns().length == 0) {
+ if (groupingColumns == 0) {
determinedTaskNum = 1;
LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 94f8b32..f2e9dd5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -48,10 +48,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -716,9 +713,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
- GroupbyNode grpNode = null;
+ LogicalNode grpNode = null;
if (parent != null) {
grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+ if (grpNode == null) {
+ grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+ }
}
// We assume this execution block the first stage of join if two or more tables are included in this block,
@@ -779,8 +779,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return taskNum;
// Is this subquery the first step of group-by?
} else if (grpNode != null) {
-
- if (grpNode.getGroupingColumns().length == 0) {
+ boolean hasGroupColumns = true;
+ if (grpNode.getType() == NodeType.GROUP_BY) {
+ hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
+ } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+ hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
+ }
+ if (!hasGroupColumns) {
return 1;
} else {
long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
@@ -836,10 +841,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info("Table's volume is approximately " + mb + " MB");
+ LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
// determine the number of task per 64MB
int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
- LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+ LOG.info(subQuery.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
return maxTaskNum;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 41ffa06..0ffcf11 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -18,23 +18,34 @@
package org.apache.tajo.engine.query;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.worker.TajoWorker;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.sql.ResultSet;
+import java.util.*;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class TestGroupByQuery extends QueryTestCaseBase {
+ private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class);
public TestGroupByQuery() throws Exception {
super(TajoConstants.DEFAULT_DATABASE_NAME);
@@ -529,4 +540,94 @@ public class TestGroupByQuery extends QueryTestCaseBase {
assertResultSet(res);
cleanupQuery(res);
}
+
+ @Test
+ public final void testNumShufflePartition() throws Exception {
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.TEXT);
+ schema.addColumn("col2", Type.TEXT);
+
+ List<String> data = new ArrayList<String>();
+ int totalBytes = 0;
+ Random rand = new Random(System.currentTimeMillis());
+ String col1Prefix = "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" +
+ "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" +
+ "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1";
+
+ Set<Integer> uniqKeys = new HashSet<Integer>();
+ while(true) {
+ int col1RandomValue = rand.nextInt(1000000);
+ uniqKeys.add(col1RandomValue);
+ String str = (col1Prefix + "-" + col1RandomValue) + "|col2-" + rand.nextInt(1000000);
+ data.add(str);
+
+ totalBytes += str.getBytes().length;
+
+ if (totalBytes > 3 * 1024 * 1024) {
+ break;
+ }
+ }
+ TajoTestingCluster.createTable("testnumshufflepartition", schema, tableOptions, data.toArray(new String[]{}), 3);
+
+ try {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname, "2");
+ ResultSet res = executeString(
+ "select col1 \n" +
+ ",count(distinct col2) as cnt1\n" +
+ "from testnumshufflepartition \n" +
+ "group by col1"
+ );
+
+ int numRows = 0;
+ while (res.next()) {
+ numRows++;
+ }
+ assertEquals(uniqKeys.size(), numRows);
+
+ // find last QueryMasterTask
+ List<QueryMasterTask> qmTasks = new ArrayList<QueryMasterTask>();
+
+ for(TajoWorker worker: testingCluster.getTajoWorkers()) {
+ qmTasks.addAll(worker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks());
+ }
+
+ assertTrue(!qmTasks.isEmpty());
+
+ Collections.sort(qmTasks, new Comparator<QueryMasterTask>() {
+ @Override
+ public int compare(QueryMasterTask o1, QueryMasterTask o2) {
+ return Long.compare(o1.getQuerySubmitTime(), o2.getQuerySubmitTime());
+ }
+ });
+
+ // Getting the number of partitions. It should be 2.
+ Set<Integer> partitionIds = new HashSet<Integer>();
+
+ Query query = qmTasks.get(qmTasks.size() - 1).getQuery();
+ Collection<SubQuery> subQueries = query.getSubQueries();
+ assertNotNull(subQueries);
+ assertTrue(!subQueries.isEmpty());
+ for (SubQuery subQuery: subQueries) {
+ if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) {
+ QueryUnit[] queryUnits = subQuery.getQueryUnits();
+ assertNotNull(queryUnits);
+ for (QueryUnit eachQueryUnit: queryUnits) {
+ for (ShuffleFileOutput output: eachQueryUnit.getShuffleFileOutputs()) {
+ partitionIds.add(output.getPartId());
+ }
+ }
+ }
+ }
+
+ assertEquals(2, partitionIds.size());
+ executeString("DROP TABLE testnumshufflepartition PURGE").close();
+ } finally {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname,
+ ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.defaultVal);
+ }
+ }
}