You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/24 17:43:42 UTC

git commit: TAJO-628: The second stage of distinct aggregation can be scheduled to only one node.

Repository: incubator-tajo
Updated Branches:
  refs/heads/master 9cce80cfe -> 5eafede2f


TAJO-628: The second stage of distinct aggregation can be scheduled to only one node.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5eafede2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5eafede2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5eafede2

Branch: refs/heads/master
Commit: 5eafede2fd1e9a512aedf51371291d028dbbeca3
Parents: 9cce80c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Feb 25 01:43:26 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Feb 25 01:43:26 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../tajo/master/querymaster/Repartitioner.java  |  5 +++-
 .../tajo/worker/TajoResourceAllocator.java      |  2 ++
 .../main/java/org/apache/tajo/worker/Task.java  | 16 ++++++------
 .../tajo/master/TestTajoResourceManager.java    | 27 +++++++++++++-------
 5 files changed, 35 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c169029..31d8942 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -263,6 +263,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-628: The second stage of distinct aggregation can be scheduled to
+    only one node. (hyunsik)
+
     TAJO-619: SELECT count(1) after joins on text keys causes wrong plans.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 33e46fd..0d3f95e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -453,11 +453,13 @@ public class Repartitioner {
       }
     }
 
-    GroupbyNode groupby = PlannerUtil.findTopNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
+    GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
     // the number of tasks cannot exceed the number of merged fetch uris.
     int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
+    LOG.info("ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
     if (groupby != null && groupby.getGroupingColumns().length == 0) {
       determinedTaskNum = 1;
+      LOG.info("No Grouping Column - determinedTaskNum is set to 1");
     }
 
     for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
@@ -468,6 +470,7 @@ public class Repartitioner {
     }
 
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+    LOG.info("DeterminedTaskNum : " + determinedTaskNum);
   }
 
   public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 28386bb..222d355 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -95,6 +95,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
+    LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
+        ", Number of Cluster Slots=" + clusterSlots);
     return  Math.min(numTasks, clusterSlots);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 066e11c..4125236 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -139,10 +139,10 @@ public class Task {
               final QueryMasterProtocolService.Interface masterProxy,
               final QueryUnitRequest request) throws IOException {
     this.request = request;
-    this.reporter = new Reporter(masterProxy);
+    this.taskId = taskId;
+    this.reporter = new Reporter(taskId, masterProxy);
     this.reporter.startCommunicationThread();
 
-    this.taskId = request.getId();
     this.systemConf = worker.getConf();
     this.queryContext = request.getQueryContext();
     this.taskRunnerContext = worker;
@@ -621,8 +621,10 @@ public class Task {
     private Thread pingThread;
     private AtomicBoolean stop = new AtomicBoolean(false);
     private static final int PROGRESS_INTERVAL = 3000;
+    private QueryUnitAttemptId taskId;
 
-    public Reporter(QueryMasterProtocolService.Interface masterStub) {
+    public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
+      this.taskId = taskId;
       this.masterStub = masterStub;
     }
 
@@ -649,14 +651,12 @@ public class Task {
               }
 
             } catch (Throwable t) {
-
-              LOG.info("Communication exception: " + StringUtils
-                  .stringifyException(t));
+              LOG.error(t.getMessage(), t);
               remainingRetries -=1;
               if (remainingRetries == 0) {
                 ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
-                LOG.warn("Last retry, killing ");
-                System.exit(65);
+                LOG.warn("Last retry, exiting ");
+                throw new RuntimeException(t);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
index 428bf46..9504927 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
@@ -28,7 +28,6 @@ import org.apache.tajo.ipc.TajoMasterProtocol.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,20 +41,19 @@ public class TestTajoResourceManager {
   private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
 
   TajoConf tajoConf;
-  TajoWorkerResourceManager tajoWorkerResourceManager;
   long queryIdTime = System.currentTimeMillis();
   int numWorkers = 5;
   float workerDiskSlots = 5.0f;
   int workerMemoryMB = 512 * 10;
   WorkerResourceAllocationResponse response;
 
-  private void initResourceManager(boolean queryMasterMode) throws Exception {
+  private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
     tajoConf = new org.apache.tajo.conf.TajoConf();
 
     tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
     tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
 
-    tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
+    TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
 
     for(int i = 0; i < numWorkers; i++) {
       ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
@@ -104,22 +102,25 @@ public class TestTajoResourceManager {
 
       tajoWorkerResourceManager.workerHeartbeat(tajoHeartbeat);
     }
+
+    return tajoWorkerResourceManager;
   }
 
 
   @Test
   public void testHeartbeat() throws Exception {
-    initResourceManager(false);
+    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
     assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
     for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
       assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
       assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
     }
+    tajoWorkerResourceManager.stop();
   }
 
   @Test
   public void testMemoryResource() throws Exception {
-    initResourceManager(false);
+    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
 
     final int minMemory = 256;
     final int maxMemory = 512;
@@ -193,11 +194,13 @@ public class TestTajoResourceManager {
       assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
       assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
     }
+
+    tajoWorkerResourceManager.stop();
   }
 
   @Test
   public void testMemoryNotCommensurable() throws Exception {
-    initResourceManager(false);
+    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
 
     final int minMemory = 200;
     final int maxMemory = 500;
@@ -271,11 +274,13 @@ public class TestTajoResourceManager {
       assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
       assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
     }
+
+    tajoWorkerResourceManager.stop();
   }
 
   @Test
   public void testDiskResource() throws Exception {
-    initResourceManager(false);
+    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
     final float minDiskSlots = 1.0f;
     final float maxDiskSlots = 2.0f;
     int memoryMB = 256;
@@ -345,11 +350,13 @@ public class TestTajoResourceManager {
       assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
       assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
     }
+
+    tajoWorkerResourceManager.stop();
   }
 
   @Test
   public void testQueryMasterResource() throws Exception {
-    initResourceManager(true);
+    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(true);
 
     int qmDefaultMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
     float qmDefaultDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
@@ -386,5 +393,7 @@ public class TestTajoResourceManager {
       totalUsedMemory += eachWorker.getUsedMemoryMB();
       totalUsedDisks += eachWorker.getUsedDiskSlots();
     }
+
+    tajoWorkerResourceManager.stop();
   }
 }