You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/28 07:50:15 UTC
tajo git commit: TAJO-1130: Concurrent execution of independent
execution blocks.
Repository: tajo
Updated Branches:
refs/heads/master f124b87e6 -> c2725a779
TAJO-1130: Concurrent execution of independent execution blocks.
Closes #423
Signed-off-by: Jihoon Son <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c2725a77
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c2725a77
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c2725a77
Branch: refs/heads/master
Commit: c2725a779c9f80d60a906657aaa95aff1a48edd0
Parents: f124b87
Author: navis.ryu <na...@apache.org>
Authored: Thu May 28 14:48:41 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu May 28 14:49:57 2015 +0900
----------------------------------------------------------------------
CHANGES | 7 +-
.../main/java/org/apache/tajo/SessionVars.java | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 5 +-
.../tajo/engine/planner/global/DataChannel.java | 4 +
.../planner/global/ExecutionBlockCursor.java | 101 ++++++++-------
.../engine/planner/global/ExecutionQueue.java | 43 +++++++
.../tajo/engine/planner/global/MasterPlan.java | 6 +-
.../planner/global/ParallelExecutionQueue.java | 126 +++++++++++++++++++
.../rules/GlobalPlanEqualityTester.java | 3 +-
.../ExplainGlobalPlanPreprocessorForTest.java | 3 +-
.../NonForwardQueryResultSystemScanner.java | 3 +-
.../apache/tajo/master/rm/WorkerResource.java | 16 +--
.../java/org/apache/tajo/querymaster/Query.java | 89 +++++++------
.../org/apache/tajo/TajoTestingCluster.java | 9 +-
.../tajo/master/TestExecutionBlockCursor.java | 4 +-
.../TestTajoCli/testHelpSessionVars.result | 1 +
16 files changed, 313 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d4ae04..a2af206 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,15 +27,18 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1130: Concurrent execution of independent execution blocks.
+ (Contributed by navis, Committed by jihoon)
+
TAJO-1618: [Rest API] queries/{queryId} should set default print type.
- (Contributed by DaeMyoung Kang, Committed by jihoon)
+ (Contributed by DaeMyung Kang, Committed by jihoon)
TAJO-1553: Improve broadcast join planning. (jihoon)
TAJO-1577: Add test cases to verify join plans. (jihoon)
TAJO-1607: Tajo Rest Cache-Id should be bigger than zero. (Contributed by
- DaeMyoung Kang, Committed by hyunsik)
+ DaeMyung Kang, Committed by hyunsik)
TAJO-1603: Refactor StorageManager. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 0d2319e..031387c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -107,6 +107,9 @@ public enum SessionVars implements ConfigKey {
GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT,
Boolean.class, Validators.bool()),
+ QUERY_EXECUTE_PARALLEL(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, "Maximum parallel running of execution blocks for a query",
+ DEFAULT, Integer.class, Validators.min("0")),
+
// for physical Executors
EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT,
Long.class, Validators.min("0")),
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index e20658b..3f350c3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -305,7 +305,7 @@ public class TajoConf extends Configuration {
/////////////////////////////////////////////////////////////////////////////////
// User Session Configuration
//
- // All session variables begin with dollor($) sign. They are default configs
+ // All session variables begin with dollar($) sign. They are default configs
// for session variables. Do not directly use the following configs. Instead,
// please use QueryContext in order to access session variables.
//
@@ -330,6 +330,9 @@ public class TajoConf extends Configuration {
$GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
+ // WARN "tajo.yarn-rm.parallel-task-runner-launcher-num" should be set enough to avoid deadlock
+ $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 1),
+
// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index dc6cd4c..3adc0a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -99,6 +99,10 @@ public class DataChannel {
return shuffleType;
}
+ public boolean needShuffle() {
+ return shuffleType != ShuffleType.NONE_SHUFFLE;
+ }
+
public TransmitType getTransmitType() {
return this.transmitType;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
index 9f82672..c6864b9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -15,6 +15,7 @@
package org.apache.tajo.engine.planner.global;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.SessionVars;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -23,10 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
* This class is a pointer to an ExecutionBlock that the query engine should execute.
*/
-public class ExecutionBlockCursor {
+public class ExecutionBlockCursor implements Iterable<ExecutionBlock> {
private MasterPlan masterPlan;
private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
- private int cursor = 0;
private List<BuildOrderItem> executionOrderedBlocks = new ArrayList<BuildOrderItem>();
private List<BuildOrderItem> notOrderedSiblingBlocks = new ArrayList<BuildOrderItem>();
@@ -45,29 +45,71 @@ public class ExecutionBlockCursor {
}
}
+ @Override
+ public Iterator<ExecutionBlock> iterator() {
+ return orderedBlocks.iterator();
+ }
+
public int size() {
return orderedBlocks.size();
}
+ public ExecutionQueue newCursor() {
+ int parallel = masterPlan.getContext().getInt(SessionVars.QUERY_EXECUTE_PARALLEL);
+ if (parallel > 1) {
+ return new ParallelExecutionQueue(masterPlan, parallel);
+ }
+ return new SimpleExecutionQueue();
+ }
+
+ public class SimpleExecutionQueue implements ExecutionQueue {
+
+ private final Iterator<ExecutionBlock> iterator = iterator();
+ private ExecutionBlock last;
+
+ @Override
+ public int size() {
+ return ExecutionBlockCursor.this.size();
+ }
+
+ @Override
+ public ExecutionBlock[] first() {
+ return iterator.hasNext() ? next(null) : null;
+ }
+
+ @Override
+ public ExecutionBlock[] next(ExecutionBlockId blockId) {
+ return iterator.hasNext() ? new ExecutionBlock[]{last = iterator.next()} : null;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (ExecutionBlock block : ExecutionBlockCursor.this) {
+ if (sb.length() > 0) {
+ sb.append(',');
+ }
+ if (block == last) {
+ sb.append('(');
+ }
+ sb.append(block.getId().getId());
+ if (block == last) {
+ sb.append(')');
+ }
+ }
+ return sb.toString();
+ }
+ }
+
// Add all execution blocks in a depth first and postfix order
private void buildDepthFirstOrder(ExecutionBlock current) {
- Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
if (!masterPlan.isLeaf(current.getId())) {
for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
- if (!masterPlan.isLeaf(execBlock)) {
- buildDepthFirstOrder(execBlock);
- } else {
- stack.push(execBlock);
- }
- }
- for (ExecutionBlock execBlock : stack) {
buildDepthFirstOrder(execBlock);
}
}
orderedBlocks.add(current);
}
-
private void buildSiblingFirstOrder(ExecutionBlock current) {
/*
|-eb_1404887024677_0004_000007
@@ -178,41 +220,4 @@ public class ExecutionBlockCursor {
return result;
}
}
-
- public boolean hasNext() {
- return cursor < orderedBlocks.size();
- }
-
- public ExecutionBlock nextBlock() {
- return orderedBlocks.get(cursor++);
- }
-
- public ExecutionBlock peek() {
- return orderedBlocks.get(cursor);
- }
-
- public ExecutionBlock peek(int skip) {
- return orderedBlocks.get(cursor + skip);
- }
-
- public void reset() {
- cursor = 0;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < orderedBlocks.size(); i++) {
- if (i == (cursor == 0 ? 0 : cursor - 1)) {
- sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")");
- } else {
- sb.append(orderedBlocks.get(i).getId().getId());
- }
-
- if (i < orderedBlocks.size() - 1) {
- sb.append(",");
- }
- }
-
- return sb.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
new file mode 100644
index 0000000..fa7ca15
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.ExecutionBlockId;
+
+// Retrieves execution blocks to run real works
+public interface ExecutionQueue {
+
+ /**
+ * remaining blocks in queue
+ *
+ * @return number of blocks
+ */
+ int size();
+
+ /**
+ * return initial blocks to be run
+ *
+ * @return blocks to be run
+ */
+ ExecutionBlock[] first();
+
+ /**
+ * get next execution blocks to be run
+ *
+ * @param blockId currently finished id of execution block
+ * @return null for finished, can return empty array
+ */
+ ExecutionBlock[] next(ExecutionBlockId blockId);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6e9b74f..22c3751 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -237,15 +237,13 @@ public class MasterPlan {
sb.append("Order of Execution\n");
sb.append("-------------------------------------------------------------------------------");
int order = 1;
- while (executionOrderCursor.hasNext()) {
- ExecutionBlock currentEB = executionOrderCursor.nextBlock();
+ for (ExecutionBlock currentEB : cursor) {
sb.append("\n").append(order).append(": ").append(currentEB.getId());
order++;
}
sb.append("\n-------------------------------------------------------------------------------\n");
- while(cursor.hasNext()) {
- ExecutionBlock block = cursor.nextBlock();
+ for (ExecutionBlock block : cursor) {
boolean terminal = false;
sb.append("\n");
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
new file mode 100644
index 0000000..1e823be
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class ParallelExecutionQueue implements ExecutionQueue, Iterable<ExecutionBlock> {
+
+ private static final Log LOG = LogFactory.getLog(ParallelExecutionQueue.class);
+
+ private final int maximum;
+ private final MasterPlan masterPlan;
+ private final List<Deque<ExecutionBlock>> executable;
+ private final Set<ExecutionBlockId> executed = new HashSet<ExecutionBlockId>();
+
+ public ParallelExecutionQueue(MasterPlan masterPlan, int maximum) {
+ this.masterPlan = masterPlan;
+ this.maximum = maximum;
+ this.executable = toStacks(masterPlan.getRoot());
+ }
+
+ private List<Deque<ExecutionBlock>> toStacks(ExecutionBlock root) {
+ List<Deque<ExecutionBlock>> stacks = new ArrayList<Deque<ExecutionBlock>>();
+ toStacks(root, stacks, new ArrayList<ExecutionBlock>());
+ return stacks;
+ }
+
+ // currently, diamond shaped DAG is not supported in tajo
+ private void toStacks(ExecutionBlock current, List<Deque<ExecutionBlock>> queues,
+ List<ExecutionBlock> stack) {
+ stack.add(current);
+ if (masterPlan.isLeaf(current.getId())) {
+ queues.add(new ArrayDeque<ExecutionBlock>(stack));
+ } else {
+ List<ExecutionBlock> children = masterPlan.getChilds(current);
+ for (int i = 0; i < children.size(); i++) {
+ toStacks(children.get(i), queues, i == 0 ? stack : new Stack<ExecutionBlock>());
+ }
+ }
+ }
+
+ @Override
+ public synchronized int size() {
+ int size = 0;
+ for (Deque<ExecutionBlock> queue : executable) {
+ size += queue.size();
+ }
+ return size;
+ }
+
+ @Override
+ public synchronized ExecutionBlock[] first() {
+ int max = Math.min(maximum, executable.size());
+ List<ExecutionBlock> result = new ArrayList<ExecutionBlock>();
+ for (Deque<ExecutionBlock> queue : executable) {
+ if (result.size() < max && isExecutableNow(queue.peekLast())) {
+ result.add(queue.removeLast());
+ }
+ }
+ LOG.info("Initial executable blocks " + result);
+ return result.toArray(new ExecutionBlock[result.size()]);
+ }
+
+ @Override
+ public synchronized ExecutionBlock[] next(ExecutionBlockId doneNow) {
+ executed.add(doneNow);
+
+ int remaining = 0;
+ for (Deque<ExecutionBlock> queue : executable) {
+ if (!queue.isEmpty() && isExecutableNow(queue.peekLast())) {
+ LOG.info("Next executable block " + queue.peekLast());
+ return new ExecutionBlock[]{queue.removeLast()};
+ }
+ remaining += queue.size();
+ }
+ return remaining > 0 ? new ExecutionBlock[0] : null;
+ }
+
+ private boolean isExecutableNow(ExecutionBlock current) {
+ ExecutionBlock parent = masterPlan.getParent(current);
+
+ List<ExecutionBlock> dependents = masterPlan.getChilds(current);
+ if (parent != null && masterPlan.getChannel(current.getId(), parent.getId()).needShuffle()) {
+ // add all children of sibling for partitioning
+ dependents = new ArrayList<ExecutionBlock>();
+ for (ExecutionBlock sibling : masterPlan.getChilds(parent)) {
+ dependents.addAll(masterPlan.getChilds(sibling));
+ }
+ }
+ for (ExecutionBlock child : dependents) {
+ if (!executed.contains(child.getId())) {
+ return false; // there's something should be done before this
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Iterator<ExecutionBlock> iterator() {
+ return Iterables.concat(executable).iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
index 9148382..9f27eed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
@@ -47,8 +47,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule {
public MasterPlan rewrite(MasterPlan plan) {
try {
ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
- while (cursor.hasNext()) {
- ExecutionBlock eb = cursor.nextBlock();
+ for (ExecutionBlock eb : cursor) {
LogicalNode node = eb.getPlan();
if (node != null) {
PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node);
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
index c26e12c..78cd015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
@@ -44,8 +44,7 @@ public class ExplainGlobalPlanPreprocessorForTest {
public void prepareTest(MasterPlan plan) {
ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
- while (cursor.hasNext()) {
- ExecutionBlock block = cursor.nextBlock();
+ for (ExecutionBlock block : cursor) {
List<DataChannel> outgoingChannels = plan.getOutgoingChannels(block.getId());
if (outgoingChannels != null) {
for (DataChannel channel : outgoingChannels) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 45b23f8..562dbc3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -105,8 +105,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
ExecutionBlock leafBlock = null;
- while (cursor.hasNext()) {
- ExecutionBlock block = cursor.nextBlock();
+ for (ExecutionBlock block : cursor) {
if (masterPlan.isLeaf(block)) {
leafBlock = block;
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index 5f2d33c..c2c0f88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -76,8 +76,8 @@ public class WorkerResource {
}
public int getMemoryMB() {
+ rlock.lock();
try {
- rlock.lock();
return memoryMB;
} finally {
rlock.unlock();
@@ -85,8 +85,8 @@ public class WorkerResource {
}
public void setMemoryMB(int memoryMB) {
+ wlock.lock();
try {
- wlock.lock();
this.memoryMB = memoryMB;
} finally {
wlock.unlock();
@@ -112,8 +112,8 @@ public class WorkerResource {
}
public int getUsedMemoryMB() {
+ rlock.lock();
try {
- rlock.lock();
return usedMemoryMB;
} finally {
rlock.unlock();
@@ -121,8 +121,8 @@ public class WorkerResource {
}
public void setUsedMemoryMB(int usedMemoryMB) {
+ wlock.lock();
try {
- wlock.lock();
this.usedMemoryMB = usedMemoryMB;
} finally {
wlock.unlock();
@@ -142,9 +142,10 @@ public class WorkerResource {
}
public void releaseResource(float diskSlots, int memoryMB) {
+ LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB");
+ wlock.lock();
try {
- wlock.lock();
- usedMemoryMB = usedMemoryMB - memoryMB;
+ usedMemoryMB -= memoryMB;
usedDiskSlots -= diskSlots;
if(usedMemoryMB < 0) {
LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
@@ -160,8 +161,9 @@ public class WorkerResource {
}
public void allocateResource(float diskSlots, int memoryMB) {
+ LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB");
+ wlock.lock();
try {
- wlock.lock();
usedMemoryMB += memoryMB;
usedDiskSlots += diskSlots;
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 362dfa6..23808b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -40,6 +40,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.ExecutionQueue;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.engine.query.QueryContext;
@@ -63,12 +64,13 @@ public class Query implements EventHandler<QueryEvent> {
// Facilities for Query
private final TajoConf systemConf;
private final Clock clock;
- private String queryStr;
- private Map<ExecutionBlockId, Stage> stages;
+ private final String queryStr;
+ private final Map<ExecutionBlockId, Stage> stages;
private final EventHandler eventHandler;
private final MasterPlan plan;
QueryMasterTask.QueryMasterTaskContext context;
private ExecutionBlockCursor cursor;
+ private ExecutionQueue execution;
// Query Status
private final QueryId id;
@@ -77,7 +79,7 @@ public class Query implements EventHandler<QueryEvent> {
private long finishTime;
private TableDesc resultDesc;
private int completedStagesCount = 0;
- private int successedStagesCount = 0;
+ private int succeededStagesCount = 0;
private int killedStagesCount = 0;
private int failedStagesCount = 0;
private int erroredStagesCount = 0;
@@ -93,7 +95,7 @@ public class Query implements EventHandler<QueryEvent> {
private QueryState queryState;
// Transition Handler
- private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
@@ -213,14 +215,12 @@ public class Query implements EventHandler<QueryEvent> {
StringBuilder sb = new StringBuilder("\n=======================================================");
sb.append("\nThe order of execution: \n");
int order = 1;
- while (cursor.hasNext()) {
- ExecutionBlock currentEB = cursor.nextBlock();
+ for (ExecutionBlock currentEB : cursor) {
sb.append("\n").append(order).append(": ").append(currentEB.getId());
order++;
}
sb.append("\n=======================================================");
LOG.info(sb);
- cursor.reset();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
@@ -381,6 +381,14 @@ public class Query implements EventHandler<QueryEvent> {
return cursor;
}
+ public ExecutionQueue newExecutionQueue() {
+ return execution = cursor.newCursor();
+ }
+
+ public ExecutionQueue getExecutionQueue() {
+ return execution;
+ }
+
public static class StartTransition
implements SingleArcTransition<Query, QueryEvent> {
@@ -388,12 +396,15 @@ public class Query implements EventHandler<QueryEvent> {
public void transition(Query query, QueryEvent queryEvent) {
query.setStartTime();
- Stage stage = new Stage(query.context, query.getPlan(),
- query.getExecutionBlockCursor().nextBlock());
- stage.setPriority(query.priority--);
- query.addStage(stage);
- stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
- LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+ ExecutionQueue executionQueue = query.newExecutionQueue();
+ for (ExecutionBlock executionBlock : executionQueue.first()) {
+ Stage stage = new Stage(query.context, query.getPlan(), executionBlock);
+ stage.setPriority(query.priority--);
+ query.addStage(stage);
+
+ stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+ LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+ }
}
}
@@ -616,25 +627,31 @@ public class Query implements EventHandler<QueryEvent> {
public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
- private boolean hasNext(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.peek();
- return !query.getPlan().isTerminal(nextBlock);
- }
-
- private void executeNextBlock(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.nextBlock();
- Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
- nextStage.setPriority(query.priority--);
- query.addStage(nextStage);
- nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
-
- LOG.info("Scheduling Stage:" + nextStage.getId());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
- LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+ // return true for terminal
+ private synchronized boolean executeNextBlock(Query query, ExecutionBlockId blockId) {
+ ExecutionQueue cursor = query.getExecutionQueue();
+ ExecutionBlock[] nextBlocks = cursor.next(blockId);
+ if (nextBlocks == null || nextBlocks.length == 0) {
+ return nextBlocks == null;
+ }
+ boolean terminal = true;
+ for (ExecutionBlock nextBlock : nextBlocks) {
+ if (query.getPlan().isTerminal(nextBlock)) {
+ continue;
+ }
+ Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+ nextStage.setPriority(query.priority--);
+ query.addStage(nextStage);
+ nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+
+ LOG.info("Scheduling Stage:" + nextStage.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+ LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+ }
+ terminal = false;
}
+ return terminal;
}
@Override
@@ -647,7 +664,7 @@ public class Query implements EventHandler<QueryEvent> {
StageCompletedEvent castEvent = (StageCompletedEvent) event;
if (castEvent.getState() == StageState.SUCCEEDED) {
- query.successedStagesCount++;
+ query.succeededStagesCount++;
} else if (castEvent.getState() == StageState.KILLED) {
query.killedStagesCount++;
} else if (castEvent.getState() == StageState.FAILED) {
@@ -663,11 +680,11 @@ public class Query implements EventHandler<QueryEvent> {
// if a stage is succeeded and a query is running
if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
- hasNext(query)) { // there remains at least one stage.
- executeNextBlock(query);
- } else { // if a query is completed due to finished, kill, failure, or error
- query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+ !executeNextBlock(query, castEvent.getExecutionBlockId())) {
+ return;
}
+ // if a query is completed due to finished, kill, failure, or error
+ query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index f4818f6..9b5980b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -133,7 +133,7 @@ public class TajoTestingCluster {
Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));
conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
}
- conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+ conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2048);
conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
@@ -156,14 +156,15 @@ public class TajoTestingCluster {
conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
// Resource allocator
- conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 2);
+ conf.setIntVar(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, 3);
+ conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 6); // make twice of parallel_max
// Memory cache termination
conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
- /* Since Travi CI limits the size of standard output log up to 4MB */
+ /* Since Travis CI limits the size of standard output log up to 4MB */
if (!StringUtils.isEmpty(LOG_LEVEL)) {
Level defaultLevel = Logger.getRootLogger().getLevel();
Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
@@ -254,7 +255,7 @@ public class TajoTestingCluster {
builder.waitSafeMode(true);
this.dfsCluster = builder.build();
- // Set this just-started cluser as our filesystem.
+ // Set this just-started cluster as our filesystem.
this.defaultFS = this.dfsCluster.getFileSystem();
this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index c82637d..0f90722 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -115,8 +116,7 @@ public class TestExecutionBlockCursor {
ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
int count = 0;
- while(cursor.hasNext()) {
- cursor.nextBlock();
+ for (ExecutionBlock eb : cursor) {
count++;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index bcd8970..7e741a9 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -26,6 +26,7 @@ Available Session Variables:
\set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
\set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled
+\set QUERY_EXECUTE_PARALLEL [int value] - Maximum parallel running of execution blocks for a query
\set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
\set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
\set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)