You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/28 07:50:15 UTC

tajo git commit: TAJO-1130: Concurrent execution of independent execution blocks.

Repository: tajo
Updated Branches:
  refs/heads/master f124b87e6 -> c2725a779


TAJO-1130: Concurrent execution of independent execution blocks.

Closes #423

Signed-off-by: Jihoon Son <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c2725a77
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c2725a77
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c2725a77

Branch: refs/heads/master
Commit: c2725a779c9f80d60a906657aaa95aff1a48edd0
Parents: f124b87
Author: navis.ryu <na...@apache.org>
Authored: Thu May 28 14:48:41 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu May 28 14:49:57 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   7 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 .../tajo/engine/planner/global/DataChannel.java |   4 +
 .../planner/global/ExecutionBlockCursor.java    | 101 ++++++++-------
 .../engine/planner/global/ExecutionQueue.java   |  43 +++++++
 .../tajo/engine/planner/global/MasterPlan.java  |   6 +-
 .../planner/global/ParallelExecutionQueue.java  | 126 +++++++++++++++++++
 .../rules/GlobalPlanEqualityTester.java         |   3 +-
 .../ExplainGlobalPlanPreprocessorForTest.java   |   3 +-
 .../NonForwardQueryResultSystemScanner.java     |   3 +-
 .../apache/tajo/master/rm/WorkerResource.java   |  16 +--
 .../java/org/apache/tajo/querymaster/Query.java |  89 +++++++------
 .../org/apache/tajo/TajoTestingCluster.java     |   9 +-
 .../tajo/master/TestExecutionBlockCursor.java   |   4 +-
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 16 files changed, 313 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d4ae04..a2af206 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,15 +27,18 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1130: Concurrent execution of independent execution blocks.
+    (Contributed by navis, Committed by jihoon)
+
     TAJO-1618: [Rest API] queries/{queryId} should set default print type.
-    (Contributed by DaeMyoung Kang, Committed by jihoon)
+    (Contributed by DaeMyung Kang, Committed by jihoon)
 
     TAJO-1553: Improve broadcast join planning. (jihoon)
 
     TAJO-1577: Add test cases to verify join plans. (jihoon)
 
     TAJO-1607: Tajo Rest Cache-Id should be bigger than zero. (Contributed by 
-    DaeMyoung Kang, Committed by hyunsik)
+    DaeMyung Kang, Committed by hyunsik)
 
     TAJO-1603: Refactor StorageManager. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 0d2319e..031387c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -107,6 +107,9 @@ public enum SessionVars implements ConfigKey {
   GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT,
       Boolean.class, Validators.bool()),
 
+  QUERY_EXECUTE_PARALLEL(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, "Maximum parallel running of execution blocks for a query",
+      DEFAULT, Integer.class, Validators.min("0")),
+
   // for physical Executors
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT,
       Long.class, Validators.min("0")),

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index e20658b..3f350c3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -305,7 +305,7 @@ public class TajoConf extends Configuration {
     /////////////////////////////////////////////////////////////////////////////////
     // User Session Configuration
     //
-    // All session variables begin with dollor($) sign. They are default configs
+    // All session variables begin with dollar($) sign. They are default configs
     // for session variables. Do not directly use the following configs. Instead,
     // please use QueryContext in order to access session variables.
     //
@@ -330,6 +330,9 @@ public class TajoConf extends Configuration {
 
     $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
 
+    // WARN "tajo.yarn-rm.parallel-task-runner-launcher-num" should be set enough to avoid deadlock
+    $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 1),
+
     // for physical Executors
     $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
     $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index dc6cd4c..3adc0a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -99,6 +99,10 @@ public class DataChannel {
     return shuffleType;
   }
 
+  public boolean needShuffle() {
+    return shuffleType != ShuffleType.NONE_SHUFFLE;
+  }
+
   public TransmitType getTransmitType() {
     return this.transmitType;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
index 9f82672..c6864b9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -15,6 +15,7 @@
 package org.apache.tajo.engine.planner.global;
 
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.SessionVars;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -23,10 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
  * This class is a pointer to an ExecutionBlock that the query engine should execute.
  */
-public class ExecutionBlockCursor {
+public class ExecutionBlockCursor implements Iterable<ExecutionBlock> {
   private MasterPlan masterPlan;
   private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
-  private int cursor = 0;
 
   private List<BuildOrderItem> executionOrderedBlocks = new ArrayList<BuildOrderItem>();
   private List<BuildOrderItem> notOrderedSiblingBlocks = new ArrayList<BuildOrderItem>();
@@ -45,29 +45,71 @@ public class ExecutionBlockCursor {
     }
   }
 
+  @Override
+  public Iterator<ExecutionBlock> iterator() {
+    return orderedBlocks.iterator();
+  }
+
   public int size() {
     return orderedBlocks.size();
   }
 
+  public ExecutionQueue newCursor() {
+    int parallel = masterPlan.getContext().getInt(SessionVars.QUERY_EXECUTE_PARALLEL);
+    if (parallel > 1) {
+      return new ParallelExecutionQueue(masterPlan, parallel);
+    }
+    return new SimpleExecutionQueue();
+  }
+
+  public class SimpleExecutionQueue implements ExecutionQueue {
+
+    private final Iterator<ExecutionBlock> iterator = iterator();
+    private ExecutionBlock last;
+
+    @Override
+    public int size() {
+      return ExecutionBlockCursor.this.size();
+    }
+
+    @Override
+    public ExecutionBlock[] first() {
+      return iterator.hasNext() ? next(null) : null;
+    }
+
+    @Override
+    public ExecutionBlock[] next(ExecutionBlockId blockId) {
+      return iterator.hasNext() ? new ExecutionBlock[]{last = iterator.next()} : null;
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (ExecutionBlock block : ExecutionBlockCursor.this) {
+        if (sb.length() > 0) {
+          sb.append(',');
+        }
+        if (block == last) {
+          sb.append('(');
+        }
+        sb.append(block.getId().getId());
+        if (block == last) {
+          sb.append(')');
+        }
+      }
+      return sb.toString();
+    }
+  }
+
   // Add all execution blocks in a depth first and postfix order
   private void buildDepthFirstOrder(ExecutionBlock current) {
-    Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
     if (!masterPlan.isLeaf(current.getId())) {
       for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
-        if (!masterPlan.isLeaf(execBlock)) {
-          buildDepthFirstOrder(execBlock);
-        } else {
-          stack.push(execBlock);
-        }
-      }
-      for (ExecutionBlock execBlock : stack) {
         buildDepthFirstOrder(execBlock);
       }
     }
     orderedBlocks.add(current);
   }
 
-
   private void buildSiblingFirstOrder(ExecutionBlock current) {
     /*
      |-eb_1404887024677_0004_000007
@@ -178,41 +220,4 @@ public class ExecutionBlockCursor {
       return result;
     }
   }
-
-  public boolean hasNext() {
-    return cursor < orderedBlocks.size();
-  }
-
-  public ExecutionBlock nextBlock() {
-    return orderedBlocks.get(cursor++);
-  }
-
-  public ExecutionBlock peek() {
-    return orderedBlocks.get(cursor);
-  }
-
-  public ExecutionBlock peek(int skip) {
-    return  orderedBlocks.get(cursor + skip);
-  }
-
-  public void reset() {
-    cursor = 0;
-  }
-
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < orderedBlocks.size(); i++) {
-      if (i == (cursor == 0 ? 0 : cursor - 1)) {
-        sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")");
-      } else {
-        sb.append(orderedBlocks.get(i).getId().getId());
-      }
-
-      if (i < orderedBlocks.size() - 1) {
-        sb.append(",");
-      }
-    }
-
-    return sb.toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
new file mode 100644
index 0000000..fa7ca15
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.ExecutionBlockId;
+
+// Retrieves execution blocks to run real works
+public interface ExecutionQueue {
+
+  /**
+   * remaining blocks in queue
+   *
+   * @return number of blocks
+   */
+  int size();
+
+  /**
+   * return initial blocks to be run
+   *
+   * @return blocks to be run
+   */
+  ExecutionBlock[] first();
+
+  /**
+   * get next execution blocks to be run
+   *
+   * @param blockId currently finished id of execution block
+   * @return null for finished, can return empty array
+   */
+  ExecutionBlock[] next(ExecutionBlockId blockId);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6e9b74f..22c3751 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -237,15 +237,13 @@ public class MasterPlan {
     sb.append("Order of Execution\n");
     sb.append("-------------------------------------------------------------------------------");
     int order = 1;
-    while (executionOrderCursor.hasNext()) {
-      ExecutionBlock currentEB = executionOrderCursor.nextBlock();
+    for (ExecutionBlock currentEB : cursor) {
       sb.append("\n").append(order).append(": ").append(currentEB.getId());
       order++;
     }
     sb.append("\n-------------------------------------------------------------------------------\n");
 
-    while(cursor.hasNext()) {
-      ExecutionBlock block = cursor.nextBlock();
+    for (ExecutionBlock block : cursor) {
 
       boolean terminal = false;
       sb.append("\n");

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
new file mode 100644
index 0000000..1e823be
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class ParallelExecutionQueue implements ExecutionQueue, Iterable<ExecutionBlock> {
+
+  private static final Log LOG = LogFactory.getLog(ParallelExecutionQueue.class);
+
+  private final int maximum;
+  private final MasterPlan masterPlan;
+  private final List<Deque<ExecutionBlock>> executable;
+  private final Set<ExecutionBlockId> executed = new HashSet<ExecutionBlockId>();
+
+  public ParallelExecutionQueue(MasterPlan masterPlan, int maximum) {
+    this.masterPlan = masterPlan;
+    this.maximum = maximum;
+    this.executable = toStacks(masterPlan.getRoot());
+  }
+
+  private List<Deque<ExecutionBlock>> toStacks(ExecutionBlock root) {
+    List<Deque<ExecutionBlock>> stacks = new ArrayList<Deque<ExecutionBlock>>();
+    toStacks(root, stacks, new ArrayList<ExecutionBlock>());
+    return stacks;
+  }
+
+  // currently, diamond shaped DAG is not supported in tajo
+  private void toStacks(ExecutionBlock current, List<Deque<ExecutionBlock>> queues,
+                        List<ExecutionBlock> stack) {
+    stack.add(current);
+    if (masterPlan.isLeaf(current.getId())) {
+      queues.add(new ArrayDeque<ExecutionBlock>(stack));
+    } else {
+      List<ExecutionBlock> children = masterPlan.getChilds(current);
+      for (int i = 0; i < children.size(); i++) {
+        toStacks(children.get(i), queues, i == 0 ? stack : new Stack<ExecutionBlock>());
+      }
+    }
+  }
+
+  @Override
+  public synchronized int size() {
+    int size = 0;
+    for (Deque<ExecutionBlock> queue : executable) {
+      size += queue.size();
+    }
+    return size;
+  }
+
+  @Override
+  public synchronized ExecutionBlock[] first() {
+    int max = Math.min(maximum, executable.size());
+    List<ExecutionBlock> result = new ArrayList<ExecutionBlock>();
+    for (Deque<ExecutionBlock> queue : executable) {
+      if (result.size() < max && isExecutableNow(queue.peekLast())) {
+        result.add(queue.removeLast());
+      }
+    }
+    LOG.info("Initial executable blocks " + result);
+    return result.toArray(new ExecutionBlock[result.size()]);
+  }
+
+  @Override
+  public synchronized ExecutionBlock[] next(ExecutionBlockId doneNow) {
+    executed.add(doneNow);
+
+    int remaining = 0;
+    for (Deque<ExecutionBlock> queue : executable) {
+      if (!queue.isEmpty() && isExecutableNow(queue.peekLast())) {
+        LOG.info("Next executable block " + queue.peekLast());
+        return new ExecutionBlock[]{queue.removeLast()};
+      }
+      remaining += queue.size();
+    }
+    return remaining > 0 ? new ExecutionBlock[0] : null;
+  }
+
+  private boolean isExecutableNow(ExecutionBlock current) {
+    ExecutionBlock parent = masterPlan.getParent(current);
+
+    List<ExecutionBlock> dependents = masterPlan.getChilds(current);
+    if (parent != null && masterPlan.getChannel(current.getId(), parent.getId()).needShuffle()) {
+      // add all children of sibling for partitioning
+      dependents = new ArrayList<ExecutionBlock>();
+      for (ExecutionBlock sibling : masterPlan.getChilds(parent)) {
+        dependents.addAll(masterPlan.getChilds(sibling));
+      }
+    }
+    for (ExecutionBlock child : dependents) {
+      if (!executed.contains(child.getId())) {
+        return false;   // there's something should be done before this
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Iterator<ExecutionBlock> iterator() {
+    return Iterables.concat(executable).iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
index 9148382..9f27eed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
@@ -47,8 +47,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule {
   public MasterPlan rewrite(MasterPlan plan) {
     try {
       ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
-      while (cursor.hasNext()) {
-        ExecutionBlock eb = cursor.nextBlock();
+      for (ExecutionBlock eb : cursor) {
         LogicalNode node = eb.getPlan();
         if (node != null) {
           PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node);

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
index c26e12c..78cd015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
@@ -44,8 +44,7 @@ public class ExplainGlobalPlanPreprocessorForTest {
   public void prepareTest(MasterPlan plan) {
     ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
 
-    while (cursor.hasNext()) {
-      ExecutionBlock block = cursor.nextBlock();
+    for (ExecutionBlock block : cursor) {
       List<DataChannel> outgoingChannels = plan.getOutgoingChannels(block.getId());
       if (outgoingChannels != null) {
         for (DataChannel channel : outgoingChannels) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 45b23f8..562dbc3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -105,8 +105,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     
     ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
     ExecutionBlock leafBlock = null;
-    while (cursor.hasNext()) {
-      ExecutionBlock block = cursor.nextBlock();
+    for (ExecutionBlock block : cursor) {
       if (masterPlan.isLeaf(block)) {
         leafBlock = block;
         break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index 5f2d33c..c2c0f88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -76,8 +76,8 @@ public class WorkerResource {
   }
 
   public int getMemoryMB() {
+    rlock.lock();
     try {
-      rlock.lock();
       return memoryMB;
     } finally {
       rlock.unlock();
@@ -85,8 +85,8 @@ public class WorkerResource {
   }
 
   public void setMemoryMB(int memoryMB) {
+    wlock.lock();
     try {
-      wlock.lock();
       this.memoryMB = memoryMB;
     } finally {
       wlock.unlock();
@@ -112,8 +112,8 @@ public class WorkerResource {
   }
 
   public int getUsedMemoryMB() {
+    rlock.lock();
     try {
-      rlock.lock();
       return usedMemoryMB;
     } finally {
       rlock.unlock();
@@ -121,8 +121,8 @@ public class WorkerResource {
   }
 
   public void setUsedMemoryMB(int usedMemoryMB) {
+    wlock.lock();
     try {
-      wlock.lock();
       this.usedMemoryMB = usedMemoryMB;
     } finally {
       wlock.unlock();
@@ -142,9 +142,10 @@ public class WorkerResource {
   }
 
   public void releaseResource(float diskSlots, int memoryMB) {
+    LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB");
+    wlock.lock();
     try {
-      wlock.lock();
-      usedMemoryMB = usedMemoryMB - memoryMB;
+      usedMemoryMB -= memoryMB;
       usedDiskSlots -= diskSlots;
       if(usedMemoryMB < 0) {
         LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
@@ -160,8 +161,9 @@ public class WorkerResource {
   }
 
   public void allocateResource(float diskSlots, int memoryMB) {
+    LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB");
+    wlock.lock();
     try {
-      wlock.lock();
       usedMemoryMB += memoryMB;
       usedDiskSlots += diskSlots;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 362dfa6..23808b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -40,6 +40,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.ExecutionQueue;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
@@ -63,12 +64,13 @@ public class Query implements EventHandler<QueryEvent> {
   // Facilities for Query
   private final TajoConf systemConf;
   private final Clock clock;
-  private String queryStr;
-  private Map<ExecutionBlockId, Stage> stages;
+  private final String queryStr;
+  private final Map<ExecutionBlockId, Stage> stages;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   QueryMasterTask.QueryMasterTaskContext context;
   private ExecutionBlockCursor cursor;
+  private ExecutionQueue execution;
 
   // Query Status
   private final QueryId id;
@@ -77,7 +79,7 @@ public class Query implements EventHandler<QueryEvent> {
   private long finishTime;
   private TableDesc resultDesc;
   private int completedStagesCount = 0;
-  private int successedStagesCount = 0;
+  private int succeededStagesCount = 0;
   private int killedStagesCount = 0;
   private int failedStagesCount = 0;
   private int erroredStagesCount = 0;
@@ -93,7 +95,7 @@ public class Query implements EventHandler<QueryEvent> {
   private QueryState queryState;
 
   // Transition Handler
-  private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
   private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
@@ -213,14 +215,12 @@ public class Query implements EventHandler<QueryEvent> {
     StringBuilder sb = new StringBuilder("\n=======================================================");
     sb.append("\nThe order of execution: \n");
     int order = 1;
-    while (cursor.hasNext()) {
-      ExecutionBlock currentEB = cursor.nextBlock();
+    for (ExecutionBlock currentEB : cursor) {
       sb.append("\n").append(order).append(": ").append(currentEB.getId());
       order++;
     }
     sb.append("\n=======================================================");
     LOG.info(sb);
-    cursor.reset();
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
@@ -381,6 +381,14 @@ public class Query implements EventHandler<QueryEvent> {
     return cursor;
   }
 
+  public ExecutionQueue newExecutionQueue() {
+    return execution = cursor.newCursor();
+  }
+
+  public ExecutionQueue getExecutionQueue() {
+    return execution;
+  }
+
   public static class StartTransition
       implements SingleArcTransition<Query, QueryEvent> {
 
@@ -388,12 +396,15 @@ public class Query implements EventHandler<QueryEvent> {
     public void transition(Query query, QueryEvent queryEvent) {
 
       query.setStartTime();
-      Stage stage = new Stage(query.context, query.getPlan(),
-          query.getExecutionBlockCursor().nextBlock());
-      stage.setPriority(query.priority--);
-      query.addStage(stage);
-      stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
-      LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+      ExecutionQueue executionQueue = query.newExecutionQueue();
+      for (ExecutionBlock executionBlock : executionQueue.first()) {
+        Stage stage = new Stage(query.context, query.getPlan(), executionBlock);
+        stage.setPriority(query.priority--);
+        query.addStage(stage);
+
+        stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+        LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+      }
     }
   }
 
@@ -616,25 +627,31 @@ public class Query implements EventHandler<QueryEvent> {
 
   public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
 
-    private boolean hasNext(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.peek();
-      return !query.getPlan().isTerminal(nextBlock);
-    }
-
-    private void executeNextBlock(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.nextBlock();
-      Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
-      nextStage.setPriority(query.priority--);
-      query.addStage(nextStage);
-      nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
-
-      LOG.info("Scheduling Stage:" + nextStage.getId());
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
-        LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+    // return true for terminal
+    private synchronized boolean executeNextBlock(Query query, ExecutionBlockId blockId) {
+      ExecutionQueue cursor = query.getExecutionQueue();
+      ExecutionBlock[] nextBlocks = cursor.next(blockId);
+      if (nextBlocks == null || nextBlocks.length == 0) {
+        return nextBlocks == null;
+      }
+      boolean terminal = true;
+      for (ExecutionBlock nextBlock : nextBlocks) {
+        if (query.getPlan().isTerminal(nextBlock)) {
+          continue;
+        }
+        Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+        nextStage.setPriority(query.priority--);
+        query.addStage(nextStage);
+        nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+
+        LOG.info("Scheduling Stage:" + nextStage.getId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+          LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+        }
+        terminal = false;
       }
+      return terminal;
     }
 
     @Override
@@ -647,7 +664,7 @@ public class Query implements EventHandler<QueryEvent> {
         StageCompletedEvent castEvent = (StageCompletedEvent) event;
 
         if (castEvent.getState() == StageState.SUCCEEDED) {
-          query.successedStagesCount++;
+          query.succeededStagesCount++;
         } else if (castEvent.getState() == StageState.KILLED) {
           query.killedStagesCount++;
         } else if (castEvent.getState() == StageState.FAILED) {
@@ -663,11 +680,11 @@ public class Query implements EventHandler<QueryEvent> {
         // if a stage is succeeded and a query is running
         if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
-            hasNext(query)) {                                   // there remains at least one stage.
-          executeNextBlock(query);
-        } else { // if a query is completed due to finished, kill, failure, or error
-          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+            !executeNextBlock(query, castEvent.getExecutionBlockId())) {
+          return;
         }
+         // if a query is completed due to finished, kill, failure, or error
+        query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
       } catch (Throwable t) {
         LOG.error(t.getMessage(), t);
         query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index f4818f6..9b5980b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -133,7 +133,7 @@ public class TajoTestingCluster {
       Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));
       conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
     }
-    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2048);
     conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
 
 
@@ -156,14 +156,15 @@ public class TajoTestingCluster {
     conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
 
     // Resource allocator
-    conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 2);
+    conf.setIntVar(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, 3);
+    conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 6);   // make twice of parallel_max
 
     // Memory cache termination
     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
 
     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
 
-    /* Since Travi CI limits the size of standard output log up to 4MB */
+    /* Since Travis CI limits the size of standard output log up to 4MB */
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();
       Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
@@ -254,7 +255,7 @@ public class TajoTestingCluster {
     builder.waitSafeMode(true);
     this.dfsCluster = builder.build();
 
-    // Set this just-started cluser as our filesystem.
+    // Set this just-started cluster as our filesystem.
     this.defaultFS = this.dfsCluster.getFileSystem();
     this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
     this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index c82637d..0f90722 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -115,8 +116,7 @@ public class TestExecutionBlockCursor {
     ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
 
     int count = 0;
-    while(cursor.hasNext()) {
-      cursor.nextBlock();
+    for (ExecutionBlock eb : cursor) {
       count++;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index bcd8970..7e741a9 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -26,6 +26,7 @@ Available Session Variables:
 \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
 \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
 \set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled
+\set QUERY_EXECUTE_PARALLEL [int value] - Maximum parallel running of execution blocks for a query
 \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
 \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
 \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)