You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/01 07:58:59 UTC

[06/14] git commit: TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim via hyunsik)

TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim via hyunsik)

Closes #974


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f6b5b38
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f6b5b38
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f6b5b38

Branch: refs/heads/index_support
Commit: 1f6b5b38752f499ee6d70ea1be399df34442b4f3
Parents: 9880f06
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Jul 28 11:19:54 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Jul 28 11:19:54 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   7 +-
 .../tajo/master/querymaster/Repartitioner.java  |  11 +-
 .../tajo/master/querymaster/SubQuery.java       |  23 +++--
 .../tajo/engine/query/TestGroupByQuery.java     | 103 ++++++++++++++++++-
 4 files changed, 131 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2e530af..3ac13a9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim 
+    via hyunsik)
+
     TAJO-977: INSERT into a partitioned table as SELECT statement uses a wrong 
     schema. (Hyoungjun Kim via hyunsik)
 
@@ -112,8 +115,8 @@ Release 0.9.0 - unreleased
     TAJO-972: Broadcast join with left outer join returns duplicated rows.
     (Hyoungjun Kim via jaehwa)
 
-    TAJO-666: java.nio.BufferOverflowException occurs when the query includes an order by 
-    clause on a TEXT column. (Mai Hai Thanh via jihoon)
+    TAJO-666: java.nio.BufferOverflowException occurs when the query includes 
+    an order by clause on a TEXT column. (Mai Hai Thanh via jihoon)
 
     TAJO-939: Refactoring the column resolver in LogicalPlan. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 31c520f..6eebbde 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -726,12 +726,21 @@ public class Repartitioner {
       }
     }
 
+    int groupingColumns = 0;
     GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
+    if (groupby != null) {
+      groupingColumns = groupby.getGroupingColumns().length;
+    } else {
+      DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+      if (dGroupby != null) {
+        groupingColumns = dGroupby.getGroupingColumns().length;
+      }
+    }
     // get a proper number of tasks
     int determinedTaskNum = Math.min(maxNum, finalFetches.size());
     LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
 
-    if (groupby != null && groupby.getGroupingColumns().length == 0) {
+    if (groupingColumns == 0) {
       determinedTaskNum = 1;
       LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 94f8b32..f2e9dd5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -48,10 +48,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -716,9 +713,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       MasterPlan masterPlan = subQuery.getMasterPlan();
       ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
 
-      GroupbyNode grpNode = null;
+      LogicalNode grpNode = null;
       if (parent != null) {
         grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+        if (grpNode == null) {
+          grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+        }
       }
 
       // We assume this execution block the first stage of join if two or more tables are included in this block,
@@ -779,8 +779,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         return taskNum;
         // Is this subquery the first step of group-by?
       } else if (grpNode != null) {
-
-        if (grpNode.getGroupingColumns().length == 0) {
+        boolean hasGroupColumns = true;
+        if (grpNode.getType() == NodeType.GROUP_BY) {
+          hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
+        } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+          hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
+        }
+        if (!hasGroupColumns) {
           return 1;
         } else {
           long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
@@ -836,10 +841,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
 
       int mb = (int) Math.ceil((double)volume / 1048576);
-      LOG.info("Table's volume is approximately " + mb + " MB");
+      LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
       // determine the number of task per 64MB
       int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
-      LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+      LOG.info(subQuery.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
       return maxTaskNum;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 41ffa06..0ffcf11 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -18,23 +18,34 @@
 
 package org.apache.tajo.engine.query;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
+import java.util.*;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class TestGroupByQuery extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class);
 
   public TestGroupByQuery() throws Exception {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
@@ -529,4 +540,94 @@ public class TestGroupByQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testNumShufflePartition() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.TEXT);
+    schema.addColumn("col2", Type.TEXT);
+
+    List<String> data = new ArrayList<String>();
+    int totalBytes = 0;
+    Random rand = new Random(System.currentTimeMillis());
+    String col1Prefix = "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" +
+        "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" +
+        "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1";
+
+    Set<Integer> uniqKeys = new HashSet<Integer>();
+    while(true) {
+      int col1RandomValue = rand.nextInt(1000000);
+      uniqKeys.add(col1RandomValue);
+      String str = (col1Prefix + "-" + col1RandomValue) + "|col2-" + rand.nextInt(1000000);
+      data.add(str);
+
+      totalBytes += str.getBytes().length;
+
+      if (totalBytes > 3 * 1024 * 1024) {
+        break;
+      }
+    }
+    TajoTestingCluster.createTable("testnumshufflepartition", schema, tableOptions, data.toArray(new String[]{}), 3);
+
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname, "2");
+      ResultSet res = executeString(
+          "select col1 \n" +
+              ",count(distinct col2) as cnt1\n" +
+              "from testnumshufflepartition \n" +
+              "group by col1"
+      );
+
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(uniqKeys.size(), numRows);
+
+      // find last QueryMasterTask
+      List<QueryMasterTask> qmTasks = new ArrayList<QueryMasterTask>();
+
+      for(TajoWorker worker: testingCluster.getTajoWorkers()) {
+        qmTasks.addAll(worker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks());
+      }
+
+      assertTrue(!qmTasks.isEmpty());
+
+      Collections.sort(qmTasks, new Comparator<QueryMasterTask>() {
+        @Override
+        public int compare(QueryMasterTask o1, QueryMasterTask o2) {
+          return Long.compare(o1.getQuerySubmitTime(), o2.getQuerySubmitTime());
+        }
+      });
+
+      // Getting the number of partitions. It should be 2.
+      Set<Integer> partitionIds = new HashSet<Integer>();
+
+      Query query = qmTasks.get(qmTasks.size() - 1).getQuery();
+      Collection<SubQuery> subQueries = query.getSubQueries();
+      assertNotNull(subQueries);
+      assertTrue(!subQueries.isEmpty());
+      for (SubQuery subQuery: subQueries) {
+        if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) {
+          QueryUnit[] queryUnits = subQuery.getQueryUnits();
+          assertNotNull(queryUnits);
+          for (QueryUnit eachQueryUnit: queryUnits) {
+            for (ShuffleFileOutput output: eachQueryUnit.getShuffleFileOutputs()) {
+              partitionIds.add(output.getPartId());
+            }
+          }
+        }
+      }
+
+      assertEquals(2, partitionIds.size());
+      executeString("DROP TABLE testnumshufflepartition PURGE").close();
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname,
+          ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.defaultVal);
+    }
+  }
 }