You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/01/09 12:08:46 UTC

tajo git commit: TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 533601eac -> 50a8a663c


TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)

Closes #339


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/50a8a663
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/50a8a663
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/50a8a663

Branch: refs/heads/master
Commit: 50a8a663c2c95f14ca59f3b01ffd79b2578f7f09
Parents: 533601e
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 20:07:36 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 20:07:36 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +-
 .../tajo/master/event/StageEventType.java       |   3 +-
 .../master/event/StageShuffleReportEvent.java   |  38 ++++
 .../java/org/apache/tajo/querymaster/Query.java |   2 +-
 .../querymaster/QueryMasterManagerService.java  |   2 +-
 .../java/org/apache/tajo/querymaster/Stage.java | 206 +++++++++++++------
 .../org/apache/tajo/querymaster/StageState.java |   1 +
 tajo-dist/pom.xml                               |  28 +++
 9 files changed, 220 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0df49b1..4e38f78 100644
--- a/CHANGES
+++ b/CHANGES
@@ -156,6 +156,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
+
     TAJO-1287: Repeated using of the same order by key in multiple 
     window clauses should be supported. (Keuntae Park)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ab11ddd..74a9271 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -188,7 +188,8 @@ public class TajoConf extends Configuration {
     /** how many launching TaskRunners in parallel */
     YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")),
     YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
-    YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+    YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
+        Runtime.getRuntime().availableProcessors() * 2),
     YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
 
     // Query Configuration

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
index fa808d4..763d426 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -34,7 +34,8 @@ public enum StageEventType {
   SQ_TASK_COMPLETED,
   SQ_FAILED,
 
-  // Producer: Completed
+  // Producer: Stage
+  SQ_SHUFFLE_REPORT,
   SQ_STAGE_COMPLETED,
 
   // Producer: Any component

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
new file mode 100644
index 0000000..924fb59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+/**
+ * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage
+ */
+public class StageShuffleReportEvent extends StageEvent {
+  private TajoWorkerProtocol.ExecutionBlockReport report;
+
+  public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) {
+    super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT);
+    this.report = report;
+  }
+
+  public TajoWorkerProtocol.ExecutionBlockReport getReport() {
+    return report;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 2932694..060e620 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -660,7 +660,6 @@ public class Query implements EventHandler<QueryEvent> {
         if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
             hasNext(query)) {                                   // there remains at least one stage.
-          query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
           executeNextBlock(query);
         } else { // if a query is completed due to finished, kill, failure, or error
           query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
@@ -692,6 +691,7 @@ public class Query implements EventHandler<QueryEvent> {
     public void transition(Query query, QueryEvent event) {
       synchronized (query.stages) {
         for (Stage stage : query.stages.values()) {
+          stage.stopFinalization();
           query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 4a91326..85cc553 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -220,7 +220,7 @@ public class QueryMasterManagerService extends CompositeService
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
-      queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
+      queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
     }
     done.run(TajoWorker.TRUE_PROTO);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 34c58d4..1ea7051 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -50,27 +50,30 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.LaunchTaskRunnersEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -102,6 +105,8 @@ public class Stage implements EventHandler<StageEvent> {
 
   private long startTime;
   private long finishTime;
+  private volatile long lastContactTime;
+  private Thread timeoutChecker;
 
   volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
   volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
@@ -114,12 +119,13 @@ public class Stage implements EventHandler<StageEvent> {
   private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
       new AllocatedContainersCancelTransition();
   private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+  private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition();
   private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
 
   protected static final StateMachineFactory<Stage, StageState,
       StageEventType, StageEvent> stateMachineFactory =
-      new StateMachineFactory <Stage, StageState,
-          StageEventType, StageEvent> (StageState.NEW)
+      new StateMachineFactory<Stage, StageState,
+          StageEventType, StageEvent>(StageState.NEW)
 
           // Transitions from NEW state
           .addTransition(StageState.NEW,
@@ -155,6 +161,9 @@ public class Stage implements EventHandler<StageEvent> {
           .addTransition(StageState.RUNNING, StageState.RUNNING,
               StageEventType.SQ_TASK_COMPLETED,
               TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.FINALIZING,
+              StageEventType.SQ_SHUFFLE_REPORT,
+              STAGE_FINALIZE_TRANSITION)
           .addTransition(StageState.RUNNING,
               EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
               StageEventType.SQ_STAGE_COMPLETED,
@@ -198,6 +207,24 @@ public class Stage implements EventHandler<StageEvent> {
               StageEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+              // Transitions from FINALIZING state
+          .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+              StageEventType.SQ_SHUFFLE_REPORT,
+              STAGE_FINALIZE_TRANSITION)
+          .addTransition(StageState.FINALIZING,
+              EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+              StageEventType.SQ_STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.FINALIZING, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              // Ignore-able Transition
+          .addTransition(StageState.FINALIZING, StageState.KILLED,
+              StageEventType.SQ_KILL)
+
               // Transitions from SUCCEEDED state
           .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
               StageEventType.SQ_CONTAINER_ALLOCATED,
@@ -273,14 +300,14 @@ public class Stage implements EventHandler<StageEvent> {
   private final Lock writeLock;
 
   private int totalScheduledObjectsCount;
-  private int succeededObjectCount = 0;
   private int completedTaskCount = 0;
-  private int succeededTaskCount = 0;
+  private int succeededObjectCount = 0;
   private int killedObjectCount = 0;
   private int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
-  private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
-  private AtomicInteger completeReportReceived = new AtomicInteger(0);
+  private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList();
+  private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
+  private AtomicBoolean stopShuffleReceiver = new AtomicBoolean();
   private StageHistory finalStageHistory;
 
   public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
@@ -465,10 +492,16 @@ public class Stage implements EventHandler<StageEvent> {
   }
 
   /**
-   * It finalizes this stage. It is only invoked when the stage is succeeded.
+   * It finalizes this stage. It is only invoked when the stage is finalizing.
    */
-  public void complete() {
+  public void finalizeStage() {
     cleanup();
+  }
+
+  /**
+   * It complete this stage. It is only invoked when the stage is succeeded.
+   */
+  public void complete() {
     finalizeStats();
     setFinishTime();
     eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
@@ -652,7 +685,7 @@ public class Stage implements EventHandler<StageEvent> {
   }
 
   private void releaseContainers() {
-    // If there are still live TaskRunners, try to kill the containers.
+    // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request
     eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
   }
 
@@ -684,6 +717,7 @@ public class Stage implements EventHandler<StageEvent> {
 
   @Override
   public void handle(StageEvent event) {
+    lastContactTime = System.currentTimeMillis();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
           + getSynchronizedState());
@@ -751,6 +785,7 @@ public class Stage implements EventHandler<StageEvent> {
                             LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
 
                             if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+                              stage.finalizeStage();
                               stage.complete();
                             } else {
                               if(stage.getSynchronizedState() == StageState.INITED) {
@@ -1192,16 +1227,19 @@ public class Stage implements EventHandler<StageEvent> {
           stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
         }
 
-        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
-            stage.getId(),
-            stage.getTotalScheduledObjectsCount(),
-            stage.succeededObjectCount,
-            stage.killedObjectCount,
-            stage.failedObjectCount));
-
-        if (stage.totalScheduledObjectsCount ==
-            stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
-          stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+        if (stage.totalScheduledObjectsCount == stage.completedTaskCount) {
+          if (stage.succeededObjectCount == stage.completedTaskCount) {
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+          } else {
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+          }
+        } else {
+          LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+              stage.getId(),
+              stage.totalScheduledObjectsCount,
+              stage.succeededObjectCount,
+              stage.killedObjectCount,
+              stage.failedObjectCount));
         }
       }
     }
@@ -1244,48 +1282,94 @@ public class Stage implements EventHandler<StageEvent> {
     return hashShuffleIntermediateEntries;
   }
 
-  protected void waitingIntermediateReport() {
-    LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
-    synchronized(completeReportReceived) {
-      long startTime = System.currentTimeMillis();
-      while (true) {
-        if (completeReportReceived.get() >= tasks.size()) {
-          LOG.info(getId() + ", completed waiting IntermediateReport");
-          return;
-        } else {
-          try {
-            completeReportReceived.wait(10 * 1000);
-          } catch (InterruptedException e) {
+  protected void stopFinalization() {
+    stopShuffleReceiver.set(true);
+  }
+
+  private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> {
+
+    @Override
+    public void transition(final Stage stage, StageEvent event) {
+      //If a shuffle report are failed, remaining reports will ignore
+      if (stage.stopShuffleReceiver.get()) {
+        return;
+      }
+
+      stage.lastContactTime = System.currentTimeMillis();
+      try {
+        if (event instanceof StageShuffleReportEvent) {
+
+          StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event;
+          TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport();
+
+          if (!report.getReportSuccess()) {
+            stage.stopFinalization();
+            LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage());
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
           }
-          long elapsedTime = System.currentTimeMillis() - startTime;
-          if (elapsedTime >= 120 * 1000) {
-            LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
-            abort(StageState.FAILED);
-            return;
+
+          stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks());
+          if (report.getIntermediateEntriesCount() > 0) {
+            for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
+              stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+            }
           }
-        }
-      }
-    }
-  }
 
-  public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
-    LOG.info(getId() + ", receiveExecutionBlockReport:" +  report.getSucceededTasks());
-    if (!report.getReportSuccess()) {
-      LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
-      abort(StageState.FAILED);
-      return;
-    }
-    if (report.getIntermediateEntriesCount() > 0) {
-      synchronized (hashShuffleIntermediateEntries) {
-        for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
-          hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+          if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) {
+            LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get());
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+            if (stage.timeoutChecker != null) {
+              stage.stopFinalization();
+              synchronized (stage.timeoutChecker){
+                stage.timeoutChecker.notifyAll();
+              }
+            }
+          } else {
+            LOG.info(stage.getId() + ", Received shuffle report: " +
+                stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+          }
+
+        } else {
+          LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)",
+              stage.getId().toString(),
+              stage.totalScheduledObjectsCount,
+              stage.succeededObjectCount,
+              stage.killedObjectCount));
+          stage.finalizeStage();
+          LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount);
+
+          /* FIXME implement timeout handler of stage and task */
+          if (stage.timeoutChecker != null) {
+            stage.timeoutChecker = new Thread(new Runnable() {
+              @Override
+              public void run() {
+                while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) {
+                  long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
+                  if (elapsedTime > 120 * 1000) {
+                    stage.stopFinalization();
+                    LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime
+                        + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+                    stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+                  }
+                  synchronized (this) {
+                    try {
+                      this.wait(1 * 1000);
+                    } catch (InterruptedException e) {
+                    }
+                  }
+                }
+              }
+            });
+            stage.timeoutChecker.start();
+          }
         }
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
+        stage.stopFinalization();
+        stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
       }
     }
-    synchronized(completeReportReceived) {
-      completeReportReceived.addAndGet(report.getSucceededTasks());
-      completeReportReceived.notifyAll();
-    }
   }
 
   private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
index 2fd62be..2d68332 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
@@ -22,6 +22,7 @@ public enum StageState {
   NEW,
   INITED,
   RUNNING,
+  FINALIZING,
   SUCCEEDED,
   FAILED,
   KILL_WAIT,

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d469ba9..3df2681 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -207,6 +207,34 @@
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>commons-el</groupId>
+              <artifactId>commons-el</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-runtime</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-compiler</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jsp-2.1-jetty</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
       <build>
         <plugins>
           <plugin>