You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/24 17:43:42 UTC
git commit: TAJO-628: The second stage of distinct aggregation can be
scheduled to only one node.
Repository: incubator-tajo
Updated Branches:
refs/heads/master 9cce80cfe -> 5eafede2f
TAJO-628: The second stage of distinct aggregation can be scheduled to only one node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5eafede2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5eafede2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5eafede2
Branch: refs/heads/master
Commit: 5eafede2fd1e9a512aedf51371291d028dbbeca3
Parents: 9cce80c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Feb 25 01:43:26 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Feb 25 01:43:26 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../tajo/master/querymaster/Repartitioner.java | 5 +++-
.../tajo/worker/TajoResourceAllocator.java | 2 ++
.../main/java/org/apache/tajo/worker/Task.java | 16 ++++++------
.../tajo/master/TestTajoResourceManager.java | 27 +++++++++++++-------
5 files changed, 35 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c169029..31d8942 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -263,6 +263,9 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-628: The second stage of distinct aggregation can be scheduled to
+ only one node. (hyunsik)
+
TAJO-619: SELECT count(1) after joins on text keys causes wrong plans.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 33e46fd..0d3f95e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -453,11 +453,13 @@ public class Repartitioner {
}
}
- GroupbyNode groupby = PlannerUtil.findTopNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
+ GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
// the number of tasks cannot exceed the number of merged fetch uris.
int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
+ LOG.info("ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
if (groupby != null && groupby.getGroupingColumns().length == 0) {
determinedTaskNum = 1;
+ LOG.info("No Grouping Column - determinedTaskNum is set to 1");
}
for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
@@ -468,6 +470,7 @@ public class Repartitioner {
}
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+ LOG.info("DeterminedTaskNum : " + determinedTaskNum);
}
public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 28386bb..222d355 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -95,6 +95,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
+ LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
+ ", Number of Cluster Slots=" + clusterSlots);
return Math.min(numTasks, clusterSlots);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 066e11c..4125236 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -139,10 +139,10 @@ public class Task {
final QueryMasterProtocolService.Interface masterProxy,
final QueryUnitRequest request) throws IOException {
this.request = request;
- this.reporter = new Reporter(masterProxy);
+ this.taskId = taskId;
+ this.reporter = new Reporter(taskId, masterProxy);
this.reporter.startCommunicationThread();
- this.taskId = request.getId();
this.systemConf = worker.getConf();
this.queryContext = request.getQueryContext();
this.taskRunnerContext = worker;
@@ -621,8 +621,10 @@ public class Task {
private Thread pingThread;
private AtomicBoolean stop = new AtomicBoolean(false);
private static final int PROGRESS_INTERVAL = 3000;
+ private QueryUnitAttemptId taskId;
- public Reporter(QueryMasterProtocolService.Interface masterStub) {
+ public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
+ this.taskId = taskId;
this.masterStub = masterStub;
}
@@ -649,14 +651,12 @@ public class Task {
}
} catch (Throwable t) {
-
- LOG.info("Communication exception: " + StringUtils
- .stringifyException(t));
+ LOG.error(t.getMessage(), t);
remainingRetries -=1;
if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
- LOG.warn("Last retry, killing ");
- System.exit(65);
+ LOG.warn("Last retry, exiting ");
+ throw new RuntimeException(t);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5eafede2/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
index 428bf46..9504927 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
@@ -28,7 +28,6 @@ import org.apache.tajo.ipc.TajoMasterProtocol.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@@ -42,20 +41,19 @@ public class TestTajoResourceManager {
private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
TajoConf tajoConf;
- TajoWorkerResourceManager tajoWorkerResourceManager;
long queryIdTime = System.currentTimeMillis();
int numWorkers = 5;
float workerDiskSlots = 5.0f;
int workerMemoryMB = 512 * 10;
WorkerResourceAllocationResponse response;
- private void initResourceManager(boolean queryMasterMode) throws Exception {
+ private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
tajoConf = new org.apache.tajo.conf.TajoConf();
tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
- tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
+ TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
for(int i = 0; i < numWorkers; i++) {
ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
@@ -104,22 +102,25 @@ public class TestTajoResourceManager {
tajoWorkerResourceManager.workerHeartbeat(tajoHeartbeat);
}
+
+ return tajoWorkerResourceManager;
}
@Test
public void testHeartbeat() throws Exception {
- initResourceManager(false);
+ TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
}
+ tajoWorkerResourceManager.stop();
}
@Test
public void testMemoryResource() throws Exception {
- initResourceManager(false);
+ TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
final int minMemory = 256;
final int maxMemory = 512;
@@ -193,11 +194,13 @@ public class TestTajoResourceManager {
assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
}
+
+ tajoWorkerResourceManager.stop();
}
@Test
public void testMemoryNotCommensurable() throws Exception {
- initResourceManager(false);
+ TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
final int minMemory = 200;
final int maxMemory = 500;
@@ -271,11 +274,13 @@ public class TestTajoResourceManager {
assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
}
+
+ tajoWorkerResourceManager.stop();
}
@Test
public void testDiskResource() throws Exception {
- initResourceManager(false);
+ TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
final float minDiskSlots = 1.0f;
final float maxDiskSlots = 2.0f;
int memoryMB = 256;
@@ -345,11 +350,13 @@ public class TestTajoResourceManager {
assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
}
+
+ tajoWorkerResourceManager.stop();
}
@Test
public void testQueryMasterResource() throws Exception {
- initResourceManager(true);
+ TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(true);
int qmDefaultMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
float qmDefaultDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
@@ -386,5 +393,7 @@ public class TestTajoResourceManager {
totalUsedMemory += eachWorker.getUsedMemoryMB();
totalUsedDisks += eachWorker.getUsedDiskSlots();
}
+
+ tajoWorkerResourceManager.stop();
}
}