You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/01/09 12:08:46 UTC
tajo git commit: TAJO-1251: Query is hanging occasionally by shuffle
report. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 533601eac -> 50a8a663c
TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
Closes #339
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/50a8a663
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/50a8a663
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/50a8a663
Branch: refs/heads/master
Commit: 50a8a663c2c95f14ca59f3b01ffd79b2578f7f09
Parents: 533601e
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 20:07:36 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 20:07:36 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 3 +-
.../tajo/master/event/StageEventType.java | 3 +-
.../master/event/StageShuffleReportEvent.java | 38 ++++
.../java/org/apache/tajo/querymaster/Query.java | 2 +-
.../querymaster/QueryMasterManagerService.java | 2 +-
.../java/org/apache/tajo/querymaster/Stage.java | 206 +++++++++++++------
.../org/apache/tajo/querymaster/StageState.java | 1 +
tajo-dist/pom.xml | 28 +++
9 files changed, 220 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0df49b1..4e38f78 100644
--- a/CHANGES
+++ b/CHANGES
@@ -156,6 +156,8 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
+
TAJO-1287: Repeated using of the same order by key in multiple
window clauses should be supported. (Keuntae Park)
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ab11ddd..74a9271 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -188,7 +188,8 @@ public class TajoConf extends Configuration {
/** how many launching TaskRunners in parallel */
YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")),
YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
- YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+ YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
+ Runtime.getRuntime().availableProcessors() * 2),
YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
// Query Configuration
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
index fa808d4..763d426 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -34,7 +34,8 @@ public enum StageEventType {
SQ_TASK_COMPLETED,
SQ_FAILED,
- // Producer: Completed
+ // Producer: Stage
+ SQ_SHUFFLE_REPORT,
SQ_STAGE_COMPLETED,
// Producer: Any component
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
new file mode 100644
index 0000000..924fb59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+/**
+ * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage
+ */
+public class StageShuffleReportEvent extends StageEvent {
+ private TajoWorkerProtocol.ExecutionBlockReport report;
+
+ public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) {
+ super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT);
+ this.report = report;
+ }
+
+ public TajoWorkerProtocol.ExecutionBlockReport getReport() {
+ return report;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 2932694..060e620 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -660,7 +660,6 @@ public class Query implements EventHandler<QueryEvent> {
if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
hasNext(query)) { // there remains at least one stage.
- query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
executeNextBlock(query);
} else { // if a query is completed due to finished, kill, failure, or error
query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
@@ -692,6 +691,7 @@ public class Query implements EventHandler<QueryEvent> {
public void transition(Query query, QueryEvent event) {
synchronized (query.stages) {
for (Stage stage : query.stages.values()) {
+ stage.stopFinalization();
query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 4a91326..85cc553 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -220,7 +220,7 @@ public class QueryMasterManagerService extends CompositeService
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
if (queryMasterTask != null) {
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
- queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
+ queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
}
done.run(TajoWorker.TRUE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 34c58d4..1ea7051 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -50,27 +50,30 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.LaunchTaskRunnersEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -102,6 +105,8 @@ public class Stage implements EventHandler<StageEvent> {
private long startTime;
private long finishTime;
+ private volatile long lastContactTime;
+ private Thread timeoutChecker;
volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
@@ -114,12 +119,13 @@ public class Stage implements EventHandler<StageEvent> {
private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
new AllocatedContainersCancelTransition();
private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+ private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition();
private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
protected static final StateMachineFactory<Stage, StageState,
StageEventType, StageEvent> stateMachineFactory =
- new StateMachineFactory <Stage, StageState,
- StageEventType, StageEvent> (StageState.NEW)
+ new StateMachineFactory<Stage, StageState,
+ StageEventType, StageEvent>(StageState.NEW)
// Transitions from NEW state
.addTransition(StageState.NEW,
@@ -155,6 +161,9 @@ public class Stage implements EventHandler<StageEvent> {
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_TASK_COMPLETED,
TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.FINALIZING,
+ StageEventType.SQ_SHUFFLE_REPORT,
+ STAGE_FINALIZE_TRANSITION)
.addTransition(StageState.RUNNING,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
StageEventType.SQ_STAGE_COMPLETED,
@@ -198,6 +207,24 @@ public class Stage implements EventHandler<StageEvent> {
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Transitions from FINALIZING state
+ .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+ StageEventType.SQ_SHUFFLE_REPORT,
+ STAGE_FINALIZE_TRANSITION)
+ .addTransition(StageState.FINALIZING,
+ EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+ StageEventType.SQ_STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.FINALIZING, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able Transition
+ .addTransition(StageState.FINALIZING, StageState.KILLED,
+ StageEventType.SQ_KILL)
+
// Transitions from SUCCEEDED state
.addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
StageEventType.SQ_CONTAINER_ALLOCATED,
@@ -273,14 +300,14 @@ public class Stage implements EventHandler<StageEvent> {
private final Lock writeLock;
private int totalScheduledObjectsCount;
- private int succeededObjectCount = 0;
private int completedTaskCount = 0;
- private int succeededTaskCount = 0;
+ private int succeededObjectCount = 0;
private int killedObjectCount = 0;
private int failedObjectCount = 0;
private TaskSchedulerContext schedulerContext;
- private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
- private AtomicInteger completeReportReceived = new AtomicInteger(0);
+ private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList();
+ private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
+ private AtomicBoolean stopShuffleReceiver = new AtomicBoolean();
private StageHistory finalStageHistory;
public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
@@ -465,10 +492,16 @@ public class Stage implements EventHandler<StageEvent> {
}
/**
- * It finalizes this stage. It is only invoked when the stage is succeeded.
+ * It finalizes this stage. It is only invoked when the stage is finalizing.
*/
- public void complete() {
+ public void finalizeStage() {
cleanup();
+ }
+
+ /**
+ * It complete this stage. It is only invoked when the stage is succeeded.
+ */
+ public void complete() {
finalizeStats();
setFinishTime();
eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
@@ -652,7 +685,7 @@ public class Stage implements EventHandler<StageEvent> {
}
private void releaseContainers() {
- // If there are still live TaskRunners, try to kill the containers.
+ // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request
eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
}
@@ -684,6 +717,7 @@ public class Stage implements EventHandler<StageEvent> {
@Override
public void handle(StageEvent event) {
+ lastContactTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
+ getSynchronizedState());
@@ -751,6 +785,7 @@ public class Stage implements EventHandler<StageEvent> {
LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+ stage.finalizeStage();
stage.complete();
} else {
if(stage.getSynchronizedState() == StageState.INITED) {
@@ -1192,16 +1227,19 @@ public class Stage implements EventHandler<StageEvent> {
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
- LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
- stage.getId(),
- stage.getTotalScheduledObjectsCount(),
- stage.succeededObjectCount,
- stage.killedObjectCount,
- stage.failedObjectCount));
-
- if (stage.totalScheduledObjectsCount ==
- stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ if (stage.totalScheduledObjectsCount == stage.completedTaskCount) {
+ if (stage.succeededObjectCount == stage.completedTaskCount) {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+ } else {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ }
+ } else {
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+ stage.getId(),
+ stage.totalScheduledObjectsCount,
+ stage.succeededObjectCount,
+ stage.killedObjectCount,
+ stage.failedObjectCount));
}
}
}
@@ -1244,48 +1282,94 @@ public class Stage implements EventHandler<StageEvent> {
return hashShuffleIntermediateEntries;
}
- protected void waitingIntermediateReport() {
- LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
- synchronized(completeReportReceived) {
- long startTime = System.currentTimeMillis();
- while (true) {
- if (completeReportReceived.get() >= tasks.size()) {
- LOG.info(getId() + ", completed waiting IntermediateReport");
- return;
- } else {
- try {
- completeReportReceived.wait(10 * 1000);
- } catch (InterruptedException e) {
+ protected void stopFinalization() {
+ stopShuffleReceiver.set(true);
+ }
+
+ private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> {
+
+ @Override
+ public void transition(final Stage stage, StageEvent event) {
+ //If a shuffle report are failed, remaining reports will ignore
+ if (stage.stopShuffleReceiver.get()) {
+ return;
+ }
+
+ stage.lastContactTime = System.currentTimeMillis();
+ try {
+ if (event instanceof StageShuffleReportEvent) {
+
+ StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event;
+ TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport();
+
+ if (!report.getReportSuccess()) {
+ stage.stopFinalization();
+ LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage());
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
}
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime >= 120 * 1000) {
- LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
- abort(StageState.FAILED);
- return;
+
+ stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks());
+ if (report.getIntermediateEntriesCount() > 0) {
+ for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
+ stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+ }
}
- }
- }
- }
- }
- public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
- LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks());
- if (!report.getReportSuccess()) {
- LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
- abort(StageState.FAILED);
- return;
- }
- if (report.getIntermediateEntriesCount() > 0) {
- synchronized (hashShuffleIntermediateEntries) {
- for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
- hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+ if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) {
+ LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get());
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ if (stage.timeoutChecker != null) {
+ stage.stopFinalization();
+ synchronized (stage.timeoutChecker){
+ stage.timeoutChecker.notifyAll();
+ }
+ }
+ } else {
+ LOG.info(stage.getId() + ", Received shuffle report: " +
+ stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+ }
+
+ } else {
+ LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)",
+ stage.getId().toString(),
+ stage.totalScheduledObjectsCount,
+ stage.succeededObjectCount,
+ stage.killedObjectCount));
+ stage.finalizeStage();
+ LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount);
+
+ /* FIXME implement timeout handler of stage and task */
+ if (stage.timeoutChecker != null) {
+ stage.timeoutChecker = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) {
+ long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
+ if (elapsedTime > 120 * 1000) {
+ stage.stopFinalization();
+ LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime
+ + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+ }
+ synchronized (this) {
+ try {
+ this.wait(1 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ });
+ stage.timeoutChecker.start();
+ }
}
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ stage.stopFinalization();
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
}
}
- synchronized(completeReportReceived) {
- completeReportReceived.addAndGet(report.getSucceededTasks());
- completeReportReceived.notifyAll();
- }
}
private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
index 2fd62be..2d68332 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
@@ -22,6 +22,7 @@ public enum StageState {
NEW,
INITED,
RUNNING,
+ FINALIZING,
SUCCEEDED,
FAILED,
KILL_WAIT,
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d469ba9..3df2681 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -207,6 +207,34 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>