You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/18 04:57:11 UTC
[5/6] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to
Task.
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
deleted file mode 100644
index 6e0d9fd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
-import org.apache.tajo.master.container.TajoContainerId;
-
-public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
- private final QueryUnitAttemptScheduleContext context;
- private final QueryUnitAttempt queryUnitAttempt;
-
- public QueryUnitAttemptScheduleEvent(EventType eventType, ExecutionBlockId executionBlockId,
- QueryUnitAttemptScheduleContext context, QueryUnitAttempt queryUnitAttempt) {
- super(eventType, executionBlockId);
- this.context = context;
- this.queryUnitAttempt = queryUnitAttempt;
- }
-
- public QueryUnitAttempt getQueryUnitAttempt() {
- return queryUnitAttempt;
- }
-
- public QueryUnitAttemptScheduleContext getContext() {
- return context;
- }
-
- public static class QueryUnitAttemptScheduleContext {
- private TajoContainerId containerId;
- private String host;
- private RpcCallback<QueryUnitRequestProto> callback;
-
- public QueryUnitAttemptScheduleContext() {
-
- }
-
- public QueryUnitAttemptScheduleContext(TajoContainerId containerId,
- String host,
- RpcCallback<QueryUnitRequestProto> callback) {
- this.containerId = containerId;
- this.host = host;
- this.callback = callback;
- }
-
- public TajoContainerId getContainerId() {
- return containerId;
- }
-
- public void setContainerId(TajoContainerId containerId) {
- this.containerId = containerId;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public RpcCallback<QueryUnitRequestProto> getCallback() {
- return callback;
- }
-
- public void setCallback(RpcCallback<QueryUnitRequestProto> callback) {
- this.callback = callback;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
index 8003ef3..79b6e2e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
@@ -30,7 +30,7 @@ public enum SubQueryEventType {
SQ_KILL,
SQ_LAUNCH,
- // Producer: QueryUnit
+ // Producer: Task
SQ_TASK_COMPLETED,
SQ_FAILED,
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0502534..816bc48 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -18,22 +18,22 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.master.TaskState;
/**
* Event Class: From Task to SubQuery
*/
public class SubQueryTaskEvent extends SubQueryEvent {
- private QueryUnitId taskId;
+ private TaskId taskId;
private TaskState state;
- public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) {
+ public SubQueryTaskEvent(TaskId taskId, TaskState state) {
super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
this.taskId = taskId;
this.state = state;
}
- public QueryUnitId getTaskId() {
+ public TaskId getTaskId() {
return this.taskId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 3b9edcb..1611370 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -18,7 +18,7 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.container.TajoContainerId;
@@ -26,7 +26,7 @@ public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
private final TajoContainerId cId;
private final WorkerConnectionInfo workerConnectionInfo;
- public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId,
+ public TaskAttemptAssignedEvent(TaskAttemptId id, TajoContainerId cId,
WorkerConnectionInfo connectionInfo) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.cId = cId;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
index f2df144..1b84de0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
@@ -19,18 +19,18 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
- private final QueryUnitAttemptId id;
+ private final TaskAttemptId id;
- public TaskAttemptEvent(QueryUnitAttemptId id,
+ public TaskAttemptEvent(TaskAttemptId id,
TaskAttemptEventType taskAttemptEventType) {
super(taskAttemptEventType);
this.id = id;
}
- public QueryUnitAttemptId getTaskAttemptId() {
+ public TaskAttemptId getTaskAttemptId() {
return this.id;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
index 8f153af..3274ef7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
@@ -19,13 +19,13 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
public class TaskAttemptScheduleEvent extends TaskAttemptEvent {
private Configuration conf;
public TaskAttemptScheduleEvent(final Configuration conf,
- final QueryUnitAttemptId id,
+ final TaskAttemptId id,
final TaskAttemptEventType taskAttemptEventType) {
super(id, taskAttemptEventType);
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index d980e05..8c5f016 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -18,13 +18,13 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private final TaskStatusProto status;
- public TaskAttemptStatusUpdateEvent(final QueryUnitAttemptId id,
+ public TaskAttemptStatusUpdateEvent(final TaskAttemptId id,
TaskStatusProto status) {
super(id, TaskAttemptEventType.TA_UPDATE);
this.status = status;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
new file mode 100644
index 0000000..91ef942
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.TaskAttempt;
+import org.apache.tajo.master.container.TajoContainerId;
+
+public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent {
+ private final TaskAttemptScheduleContext context;
+ private final TaskAttempt taskAttempt;
+
+ public TaskAttemptToSchedulerEvent(EventType eventType, ExecutionBlockId executionBlockId,
+ TaskAttemptScheduleContext context, TaskAttempt taskAttempt) {
+ super(eventType, executionBlockId);
+ this.context = context;
+ this.taskAttempt = taskAttempt;
+ }
+
+ public TaskAttempt getTaskAttempt() {
+ return taskAttempt;
+ }
+
+ public TaskAttemptScheduleContext getContext() {
+ return context;
+ }
+
+ public static class TaskAttemptScheduleContext {
+ private TajoContainerId containerId;
+ private String host;
+ private RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback;
+
+ public TaskAttemptScheduleContext() {
+
+ }
+
+ public TaskAttemptScheduleContext(TajoContainerId containerId,
+ String host,
+ RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) {
+ this.containerId = containerId;
+ this.host = host;
+ this.callback = callback;
+ }
+
+ public TajoContainerId getContainerId() {
+ return containerId;
+ }
+
+ public void setContainerId(TajoContainerId containerId) {
+ this.containerId = containerId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
+ return callback;
+ }
+
+ public void setCallback(RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) {
+ this.callback = callback;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index 3ee389a..20204aa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -18,14 +18,14 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
public class TaskCompletionEvent extends TaskAttemptEvent {
private TaskCompletionReport report;
public TaskCompletionEvent(TaskCompletionReport report) {
- super(new QueryUnitAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
+ super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
this.report = report;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
index 234491b..377a8e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
@@ -19,17 +19,17 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
public class TaskEvent extends AbstractEvent<TaskEventType> {
- private final QueryUnitId id;
+ private final TaskId id;
- public TaskEvent(QueryUnitId id, TaskEventType taskEventType) {
+ public TaskEvent(TaskId id, TaskEventType taskEventType) {
super(taskEventType);
this.id = id;
}
- public QueryUnitId getTaskId() {
+ public TaskId getTaskId() {
return id;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index a4d9900..03888bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -18,19 +18,19 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
public class TaskFatalErrorEvent extends TaskAttemptEvent {
private final String message;
public TaskFatalErrorEvent(TaskFatalErrorReport report) {
- super(new QueryUnitAttemptId(report.getId()),
+ super(new TaskAttemptId(report.getId()),
TaskAttemptEventType.TA_FATAL_ERROR);
this.message = report.getErrorMessage();
}
- public TaskFatalErrorEvent(QueryUnitAttemptId attemptId, String message) {
+ public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) {
super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR);
this.message = message;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 9e8e3dd..3f72ed9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,8 @@ package org.apache.tajo.master.event;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
import org.apache.tajo.master.container.TajoContainerId;
@@ -35,12 +36,12 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
private final TajoContainerId containerId;
private final ExecutionBlockId executionBlockId;
- private final RpcCallback<QueryUnitRequestProto> callback;
+ private final RpcCallback<TaskRequestProto> callback;
public TaskRequestEvent(int workerId,
TajoContainerId containerId,
ExecutionBlockId executionBlockId,
- RpcCallback<QueryUnitRequestProto> callback) {
+ RpcCallback<TaskRequestProto> callback) {
super(TaskRequestEventType.TASK_REQ);
this.workerId = workerId;
this.containerId = containerId;
@@ -60,7 +61,7 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
return executionBlockId;
}
- public RpcCallback<QueryUnitRequestProto> getCallback() {
+ public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
return this.callback;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
index 28654f0..a4f120c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
@@ -18,17 +18,17 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
public class TaskTAttemptEvent extends TaskEvent {
- private final QueryUnitAttemptId attemptId;
- public TaskTAttemptEvent(QueryUnitAttemptId attemptId,
+ private final TaskAttemptId attemptId;
+ public TaskTAttemptEvent(TaskAttemptId attemptId,
TaskEventType eventType) {
- super(attemptId.getQueryUnitId(), eventType);
+ super(attemptId.getTaskId(), eventType);
this.attemptId = attemptId;
}
- public QueryUnitAttemptId getTaskAttemptId() {
+ public TaskAttemptId getTaskAttemptId() {
return attemptId;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index f4bd8a3..e7e2bc0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -122,7 +122,7 @@ public class QueryMasterManagerService extends CompositeService
@Override
public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
- RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+ RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
try {
ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
@@ -144,15 +144,15 @@ public class QueryMasterManagerService extends CompositeService
public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
- QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+ QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+ TaskAttemptId attemptId = new TaskAttemptId(request.getId());
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask == null) {
queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
}
- SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
- QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
- QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+ SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId());
+ Task task = sq.getTask(attemptId.getTaskId());
+ TaskAttempt attempt = task.getAttempt(attemptId.getId());
if(LOG.isDebugEnabled()){
LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
@@ -161,10 +161,10 @@ public class QueryMasterManagerService extends CompositeService
if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
LOG.warn(attemptId + " Killed");
attempt.handle(
- new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+ new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
} else {
queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+ new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
}
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
@@ -185,11 +185,11 @@ public class QueryMasterManagerService extends CompositeService
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
if (queryMasterTask != null) {
queryMasterTask.handleTaskFailed(report);
} else {
- LOG.warn("No QueryMasterTask: " + new QueryUnitAttemptId(report.getId()));
+ LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
}
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
@@ -203,7 +203,7 @@ public class QueryMasterManagerService extends CompositeService
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
if (queryMasterTask != null) {
queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 8f63416..9ab4f0a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -285,12 +285,12 @@ public class QueryMasterTask extends CompositeService {
private class TaskEventDispatcher
implements EventHandler<TaskEvent> {
public void handle(TaskEvent event) {
- QueryUnitId taskId = event.getTaskId();
+ TaskId taskId = event.getTaskId();
if(LOG.isDebugEnabled()) {
LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
}
- QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()).
- getQueryUnit(taskId);
+ Task task = query.getSubQuery(taskId.getExecutionBlockId()).
+ getTask(taskId);
task.handle(event);
}
}
@@ -298,10 +298,10 @@ public class QueryMasterTask extends CompositeService {
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
public void handle(TaskAttemptEvent event) {
- QueryUnitAttemptId attemptId = event.getTaskAttemptId();
- SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
- QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
- QueryUnitAttempt attempt = task.getAttempt(attemptId);
+ TaskAttemptId attemptId = event.getTaskAttemptId();
+ SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId());
+ Task task = subQuery.getTask(attemptId.getTaskId());
+ TaskAttempt attempt = task.getAttempt(attemptId);
attempt.handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
deleted file mode 100644
index 75402c2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.FragmentPair;
-import org.apache.tajo.master.TaskState;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.util.history.QueryUnitHistory;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class QueryUnit implements EventHandler<TaskEvent> {
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(QueryUnit.class);
-
- private final Configuration systemConf;
- private QueryUnitId taskId;
- private EventHandler eventHandler;
- private StoreTableNode store = null;
- private LogicalNode plan = null;
- private List<ScanNode> scan;
-
- private Map<String, Set<FragmentProto>> fragMap;
- private Map<String, Set<FetchImpl>> fetchMap;
-
- private int totalFragmentNum;
-
- private List<ShuffleFileOutput> shuffleFileOutputs;
- private TableStats stats;
- private final boolean isLeafTask;
- private List<IntermediateEntry> intermediateData;
-
- private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
- private final int maxAttempts = 3;
- private Integer nextAttempt = -1;
- private QueryUnitAttemptId lastAttemptId;
-
- private QueryUnitAttemptId successfulAttempt;
- private String succeededHost;
- private int succeededHostPort;
- private int succeededPullServerPort;
-
- private int failedAttempts;
- private int finishedAttempts; // finish are total of success, failed and killed
-
- private long launchTime;
- private long finishTime;
-
- private List<DataLocation> dataLocations = Lists.newArrayList();
-
- private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
-
- private QueryUnitHistory finalQueryUnitHistory;
-
- protected static final StateMachineFactory
- <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
- new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
- // Transitions from NEW state
- .addTransition(TaskState.NEW, TaskState.SCHEDULED,
- TaskEventType.T_SCHEDULE,
- new InitialScheduleTransition())
- .addTransition(TaskState.NEW, TaskState.KILLED,
- TaskEventType.T_KILL,
- new KillNewTaskTransition())
-
- // Transitions from SCHEDULED state
- .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED,
- new AttemptLaunchedTransition())
- .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
- TaskEventType.T_KILL,
- new KillTaskTransition())
-
- // Transitions from RUNNING state
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
- .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
- new AttemptSucceededTransition())
- .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
- TaskEventType.T_KILL,
- new KillTaskTransition())
- .addTransition(TaskState.RUNNING,
- EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
- TaskEventType.T_ATTEMPT_FAILED,
- new AttemptFailedOrRetryTransition())
-
- // Transitions from KILL_WAIT state
- .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
- TaskEventType.T_ATTEMPT_KILLED,
- ATTEMPT_KILLED_TRANSITION)
- .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
- TaskEventType.T_ATTEMPT_LAUNCHED,
- new KillTaskTransition())
- .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
- TaskEventType.T_ATTEMPT_FAILED,
- new AttemptFailedTransition())
- .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
- ATTEMPT_KILLED_TRANSITION)
- // Ignore-able transitions.
- .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
- EnumSet.of(
- TaskEventType.T_KILL,
- TaskEventType.T_SCHEDULE))
-
- // Transitions from SUCCEEDED state
- // Ignore-able transitions
- .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- // Transitions from FAILED state
- // Ignore-able transitions
- .addTransition(TaskState.FAILED, TaskState.FAILED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- // Transitions from KILLED state
- .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition())
- // Ignore-able transitions
- .addTransition(TaskState.KILLED, TaskState.KILLED,
- EnumSet.of(
- TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- .installTopology();
-
- private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
-
-
- private final Lock readLock;
- private final Lock writeLock;
- private QueryUnitAttemptScheduleContext scheduleContext;
-
- public QueryUnit(Configuration conf, QueryUnitAttemptScheduleContext scheduleContext,
- QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
- this.systemConf = conf;
- this.taskId = id;
- this.eventHandler = eventHandler;
- this.isLeafTask = isLeafTask;
- scan = new ArrayList<ScanNode>();
- fetchMap = Maps.newHashMap();
- fragMap = Maps.newHashMap();
- shuffleFileOutputs = new ArrayList<ShuffleFileOutput>();
- attempts = Collections.emptyMap();
- lastAttemptId = null;
- nextAttempt = -1;
- failedAttempts = 0;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
- this.scheduleContext = scheduleContext;
-
- stateMachine = stateMachineFactory.make(this);
- totalFragmentNum = 0;
- }
-
- public boolean isLeafTask() {
- return this.isLeafTask;
- }
-
- public TaskState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public TaskAttemptState getLastAttemptStatus() {
- QueryUnitAttempt lastAttempt = getLastAttempt();
- if (lastAttempt != null) {
- return lastAttempt.getState();
- } else {
- return TaskAttemptState.TA_ASSIGNED;
- }
- }
-
- public QueryUnitHistory getQueryUnitHistory() {
- if (finalQueryUnitHistory != null) {
- if (finalQueryUnitHistory.getFinishTime() == 0) {
- finalQueryUnitHistory = makeQueryUnitHistory();
- }
- return finalQueryUnitHistory;
- } else {
- return makeQueryUnitHistory();
- }
- }
-
- private QueryUnitHistory makeQueryUnitHistory() {
- QueryUnitHistory queryUnitHistory = new QueryUnitHistory();
-
- QueryUnitAttempt lastAttempt = getLastAttempt();
- if (lastAttempt != null) {
- queryUnitHistory.setId(lastAttempt.getId().toString());
- queryUnitHistory.setState(lastAttempt.getState().toString());
- queryUnitHistory.setProgress(lastAttempt.getProgress());
- }
- queryUnitHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
- queryUnitHistory.setRetryCount(this.getRetryCount());
- queryUnitHistory.setLaunchTime(launchTime);
- queryUnitHistory.setFinishTime(finishTime);
-
- queryUnitHistory.setNumShuffles(getShuffleOutpuNum());
- if (!getShuffleFileOutputs().isEmpty()) {
- ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0);
- if (queryUnitHistory.getNumShuffles() > 0) {
- queryUnitHistory.setShuffleKey("" + shuffleFileOutputs.getPartId());
- queryUnitHistory.setShuffleFileName(shuffleFileOutputs.getFileName());
- }
- }
-
- List<String> fragmentList = new ArrayList<String>();
- for (FragmentProto eachFragment : getAllFragments()) {
- try {
- Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
- fragmentList.add(fragment.toString());
- } catch (Exception e) {
- LOG.error(e.getMessage());
- fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
- }
- }
- queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
-
- List<String[]> fetchList = new ArrayList<String[]>();
- for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
- fetchList.add(new String[] {e.getKey(), uri.toString()});
- }
- }
- }
-
- queryUnitHistory.setFetchs(fetchList.toArray(new String[][]{}));
-
- List<String> dataLocationList = new ArrayList<String>();
- for(DataLocation eachLocation: getDataLocations()) {
- dataLocationList.add(eachLocation.toString());
- }
-
- queryUnitHistory.setDataLocations(dataLocationList.toArray(new String[]{}));
- return queryUnitHistory;
- }
-
- public void setLogicalPlan(LogicalNode plan) {
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scan.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- s.add(((TableSubQueryNode) node).getSubQuery());
- }
- }
- }
-
- private void addDataLocation(Fragment fragment) {
- String[] hosts = fragment.getHosts();
- int[] diskIds = null;
- if (fragment instanceof FileFragment) {
- diskIds = ((FileFragment)fragment).getDiskIds();
- }
- for (int i = 0; i < hosts.length; i++) {
- dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
- }
- }
-
- public void addFragment(Fragment fragment, boolean useDataLocation) {
- Set<FragmentProto> fragmentProtos;
- if (fragMap.containsKey(fragment.getTableName())) {
- fragmentProtos = fragMap.get(fragment.getTableName());
- } else {
- fragmentProtos = new HashSet<FragmentProto>();
- fragMap.put(fragment.getTableName(), fragmentProtos);
- }
- fragmentProtos.add(fragment.getProto());
- if (useDataLocation) {
- addDataLocation(fragment);
- }
- totalFragmentNum++;
- }
-
- public void addFragments(Collection<Fragment> fragments) {
- for (Fragment eachFragment: fragments) {
- addFragment(eachFragment, false);
- }
- }
-
- public void setFragment(FragmentPair[] fragmentPairs) {
- for (FragmentPair eachFragmentPair : fragmentPairs) {
- this.addFragment(eachFragmentPair.getLeftFragment(), true);
- if (eachFragmentPair.getRightFragment() != null) {
- this.addFragment(eachFragmentPair.getRightFragment(), true);
- }
- }
- }
-
- public List<DataLocation> getDataLocations() {
- return dataLocations;
- }
-
- public String getSucceededHost() {
- return succeededHost;
- }
-
- public void addFetches(String tableId, Collection<FetchImpl> fetches) {
- Set<FetchImpl> fetchSet;
- if (fetchMap.containsKey(tableId)) {
- fetchSet = fetchMap.get(tableId);
- } else {
- fetchSet = Sets.newHashSet();
- }
- fetchSet.addAll(fetches);
- fetchMap.put(tableId, fetchSet);
- }
-
- public void setFetches(Map<String, Set<FetchImpl>> fetches) {
- this.fetchMap.clear();
- this.fetchMap.putAll(fetches);
- }
-
- public Collection<FragmentProto> getAllFragments() {
- Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
- for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
- fragmentProtos.addAll(eachFragmentSet);
- }
- return fragmentProtos;
- }
-
- public LogicalNode getLogicalPlan() {
- return this.plan;
- }
-
- public QueryUnitId getId() {
- return taskId;
- }
-
- public Collection<FetchImpl> getFetchHosts(String tableId) {
- return fetchMap.get(tableId);
- }
-
- public Collection<Set<FetchImpl>> getFetches() {
- return fetchMap.values();
- }
-
- public Map<String, Set<FetchImpl>> getFetchMap() {
- return fetchMap;
- }
-
- public Collection<FetchImpl> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableName());
- }
-
- public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(plan.getType() + " \n");
- for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
- builder.append(e.getKey()).append(" : ");
- for (FragmentProto fragment : e.getValue()) {
- builder.append(fragment).append(", ");
- }
- }
- for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
- builder.append(e.getKey()).append(" : ");
- for (FetchImpl t : e.getValue()) {
- for (URI uri : t.getURIs()){
- builder.append(uri).append(" ");
- }
- }
- }
-
- return builder.toString();
- }
-
- public void setStats(TableStats stats) {
- this.stats = stats;
- }
-
- public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) {
- this.shuffleFileOutputs = Collections.unmodifiableList(partitions);
- }
-
- public TableStats getStats() {
- return this.stats;
- }
-
- public List<ShuffleFileOutput> getShuffleFileOutputs() {
- return this.shuffleFileOutputs;
- }
-
- public int getShuffleOutpuNum() {
- return this.shuffleFileOutputs.size();
- }
-
- public QueryUnitAttempt newAttempt() {
- QueryUnitAttempt attempt = new QueryUnitAttempt(scheduleContext,
- QueryIdFactory.newQueryUnitAttemptId(this.getId(), ++nextAttempt),
- this, eventHandler);
- lastAttemptId = attempt.getId();
- return attempt;
- }
-
- public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
- return attempts.get(attemptId);
- }
-
- public QueryUnitAttempt getAttempt(int attempt) {
- return this.attempts.get(QueryIdFactory.newQueryUnitAttemptId(this.getId(), attempt));
- }
-
- public QueryUnitAttempt getLastAttempt() {
- return getAttempt(this.lastAttemptId);
- }
-
- public QueryUnitAttempt getSuccessfulAttempt() {
- readLock.lock();
- try {
- if (null == successfulAttempt) {
- return null;
- }
- return attempts.get(successfulAttempt);
- } finally {
- readLock.unlock();
- }
- }
-
- public int getRetryCount () {
- return this.nextAttempt;
- }
-
- public int getTotalFragmentNum() {
- return totalFragmentNum;
- }
-
- private static class InitialScheduleTransition implements
- SingleArcTransition<QueryUnit, TaskEvent> {
-
- @Override
- public void transition(QueryUnit task, TaskEvent taskEvent) {
- task.addAndScheduleAttempt();
- }
- }
-
- public long getLaunchTime() {
- return launchTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- @VisibleForTesting
- public void setLaunchTime(long launchTime) {
- this.launchTime = launchTime;
- }
-
- @VisibleForTesting
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
- }
-
- public long getRunningTime() {
- if(finishTime > 0) {
- return finishTime - launchTime;
- } else {
- return System.currentTimeMillis() - launchTime;
- }
- }
-
- // This is always called in the Write Lock
- private void addAndScheduleAttempt() {
- // Create new task attempt
- QueryUnitAttempt attempt = newAttempt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created attempt " + attempt.getId());
- }
- switch (attempts.size()) {
- case 0:
- attempts = Collections.singletonMap(attempt.getId(), attempt);
- break;
-
- case 1:
- Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
- = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
- newAttempts.putAll(attempts);
- attempts = newAttempts;
- attempts.put(attempt.getId(), attempt);
- break;
-
- default:
- attempts.put(attempt.getId(), attempt);
- break;
- }
-
- if (failedAttempts > 0) {
- eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
- }
-
- private void finishTask() {
- this.finishTime = System.currentTimeMillis();
- finalQueryUnitHistory = makeQueryUnitHistory();
- }
-
- private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-
- @Override
- public void transition(QueryUnit task, TaskEvent taskEvent) {
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
- }
- }
-
- private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-
- @Override
- public void transition(QueryUnit task, TaskEvent taskEvent) {
- task.finishTask();
- task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
- }
- }
-
- private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
-
- @Override
- public void transition(QueryUnit task, TaskEvent event) {
- task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
- }
- }
-
- private static class AttemptSucceededTransition
- implements SingleArcTransition<QueryUnit, TaskEvent>{
-
- @Override
- public void transition(QueryUnit task,
- TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
-
- task.successfulAttempt = attemptEvent.getTaskAttemptId();
- task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
- task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
- task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
-
- task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
- }
- }
-
- private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
- @Override
- public void transition(QueryUnit task,
- TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
- task.launchTime = System.currentTimeMillis();
- task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
- task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
- }
- }
-
- private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
- @Override
- public void transition(QueryUnit task, TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- LOG.info("=============================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
- LOG.info("=============================================================");
- task.failedAttempts++;
- task.finishedAttempts++;
-
- task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
- }
- }
-
- private static class AttemptFailedOrRetryTransition implements
- MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
-
- @Override
- public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
- task.failedAttempts++;
- task.finishedAttempts++;
- boolean retry = task.failedAttempts < task.maxAttempts;
-
- LOG.info("====================================================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
- "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<");
- LOG.info("====================================================================================");
-
- if (retry) {
- if (task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
- }
- } else {
- task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
- return TaskState.FAILED;
- }
-
- return task.getState();
- }
- }
-
- @Override
- public void handle(TaskEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskId() + " of type "
- + event.getType());
- }
-
- try {
- writeLock.lock();
- TaskState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getState().name()
- , e);
- eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
- QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-
- public void setIntermediateData(Collection<IntermediateEntry> partitions) {
- this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
- }
-
- public List<IntermediateEntry> getIntermediateData() {
- return this.intermediateData;
- }
-
- public static class PullHost implements Cloneable {
- String host;
- int port;
- int hashCode;
-
- public PullHost(String pullServerAddr, int pullServerPort){
- this.host = pullServerAddr;
- this.port = pullServerPort;
- this.hashCode = Objects.hashCode(host, port);
- }
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public String getPullAddress() {
- return host + ":" + port;
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof PullHost) {
- PullHost other = (PullHost) obj;
- return host.equals(other.host) && port == other.port;
- }
-
- return false;
- }
-
- @Override
- public PullHost clone() throws CloneNotSupportedException {
- PullHost newPullHost = (PullHost) super.clone();
- newPullHost.host = host;
- newPullHost.port = port;
- newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
- return newPullHost;
- }
-
- @Override
- public String toString() {
- return host + ":" + port;
- }
- }
-
- public static class IntermediateEntry {
- ExecutionBlockId ebId;
- int taskId;
- int attemptId;
- int partId;
- PullHost host;
- long volume;
- List<Pair<Long, Integer>> pages;
- List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
-
- public IntermediateEntry(IntermediateEntryProto proto) {
- this.ebId = new ExecutionBlockId(proto.getEbId());
- this.taskId = proto.getTaskId();
- this.attemptId = proto.getAttemptId();
- this.partId = proto.getPartId();
-
- String[] pullHost = proto.getHost().split(":");
- this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
- this.volume = proto.getVolume();
-
- failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
- failureRowNums.add(new Pair(eachFailure.getPagePos(),
- new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
- }
-
- pages = new ArrayList<Pair<Long, Integer>>();
- for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
- pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
- }
- }
-
- public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
- this.taskId = taskId;
- this.attemptId = attemptId;
- this.partId = partId;
- this.host = host;
- }
-
- public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) {
- this.taskId = taskId;
- this.attemptId = attemptId;
- this.partId = partId;
- this.host = host;
- this.volume = volume;
- }
-
- public ExecutionBlockId getEbId() {
- return ebId;
- }
-
- public void setEbId(ExecutionBlockId ebId) {
- this.ebId = ebId;
- }
-
- public int getTaskId() {
- return this.taskId;
- }
-
- public int getAttemptId() {
- return this.attemptId;
- }
-
- public int getPartId() {
- return this.partId;
- }
-
- public PullHost getPullHost() {
- return this.host;
- }
-
- public long getVolume() {
- return this.volume;
- }
-
- public long setVolume(long volume) {
- return this.volume = volume;
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
-
- public void setPages(List<Pair<Long, Integer>> pages) {
- this.pages = pages;
- }
-
- public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
- return failureRowNums;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(ebId, taskId, partId, attemptId, host);
- }
-
- public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
- List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
-
- if (pages == null || pages.isEmpty()) {
- return splits;
- }
- int pageSize = pages.size();
-
- long currentOffset = -1;
- long currentBytes = 0;
-
- long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
- for (int i = 0; i < pageSize; i++) {
- Pair<Long, Integer> eachPage = pages.get(i);
- if (currentOffset == -1) {
- currentOffset = eachPage.getFirst();
- }
- if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
- splits.add(new Pair(currentOffset, currentBytes));
- currentOffset = eachPage.getFirst();
- currentBytes = 0;
- realSplitVolume = splitVolume;
- }
-
- currentBytes += eachPage.getSecond();
- }
-
- //add last
- if (currentBytes > 0) {
- splits.add(new Pair(currentOffset, currentBytes));
- }
- return splits;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
deleted file mode 100644
index d88173f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
-import org.apache.tajo.master.container.TajoContainerId;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
-
- private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
-
- private final static int EXPIRE_TIME = 15000;
-
- private final QueryUnitAttemptId id;
- private final QueryUnit queryUnit;
- final EventHandler eventHandler;
-
- private TajoContainerId containerId;
- private WorkerConnectionInfo workerConnectionInfo;
- private int expire;
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private final List<String> diagnostics = new ArrayList<String>();
-
- private final QueryUnitAttemptScheduleContext scheduleContext;
-
- private float progress;
- private CatalogProtos.TableStatsProto inputStats;
- private CatalogProtos.TableStatsProto resultStats;
-
- protected static final StateMachineFactory
- <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachineFactory = new StateMachineFactory
- <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- (TaskAttemptState.TA_NEW)
-
- // Transitions from TA_NEW state
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_KILL,
- new TaskKilledCompleteTransition())
-
- // Transitions from TA_UNASSIGNED state
- .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED,
- new LaunchTransition())
- .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillUnassignedTaskTransition())
-
- // Transitions from TA_ASSIGNED state
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED,
- EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- // Transitions from TA_RUNNING state
- .addTransition(TaskAttemptState.TA_RUNNING,
- EnumSet.of(TaskAttemptState.TA_RUNNING),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_LOCAL_KILLED,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_ASSIGNED,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_SCHEDULE_CANCELED,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_DONE,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR)
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
- EnumSet.of(
- TaskAttemptEventType.TA_KILL,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
- TaskAttemptEventType.TA_UPDATE))
-
- // Transitions from TA_SUCCEEDED state
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_UPDATE)
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
- // Ignore-able transitions
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_KILL)
-
- // Transitions from TA_KILLED state
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
- // Ignore-able transitions
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- EnumSet.of(
- TaskAttemptEventType.TA_UPDATE))
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- EnumSet.of(
- TaskAttemptEventType.TA_LOCAL_KILLED,
- TaskAttemptEventType.TA_KILL,
- TaskAttemptEventType.TA_ASSIGNED,
- TaskAttemptEventType.TA_DONE),
- new TaskKilledCompleteTransition())
- .installTopology();
-
- private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachine;
-
-
- public QueryUnitAttempt(final QueryUnitAttemptScheduleContext scheduleContext,
- final QueryUnitAttemptId id, final QueryUnit queryUnit,
- final EventHandler eventHandler) {
- this.scheduleContext = scheduleContext;
- this.id = id;
- this.expire = QueryUnitAttempt.EXPIRE_TIME;
- this.queryUnit = queryUnit;
- this.eventHandler = eventHandler;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- }
-
- public TaskAttemptState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public QueryUnitAttemptId getId() {
- return this.id;
- }
-
- public boolean isLeafTask() {
- return this.queryUnit.isLeafTask();
- }
-
- public QueryUnit getQueryUnit() {
- return this.queryUnit;
- }
-
- public WorkerConnectionInfo getWorkerConnectionInfo() {
- return this.workerConnectionInfo;
- }
-
- public void setContainerId(TajoContainerId containerId) {
- this.containerId = containerId;
- }
-
- public synchronized void setExpireTime(int expire) {
- this.expire = expire;
- }
-
- public synchronized void updateExpireTime(int period) {
- this.setExpireTime(this.expire - period);
- }
-
- public synchronized void resetExpireTime() {
- this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
- }
-
- public int getLeftTime() {
- return this.expire;
- }
-
- public float getProgress() {
- return progress;
- }
-
- public TableStats getInputStats() {
- if (inputStats == null) {
- return null;
- }
-
- return new TableStats(inputStats);
- }
-
- public TableStats getResultStats() {
- if (resultStats == null) {
- return null;
- }
- return new TableStats(resultStats);
- }
-
- private void fillTaskStatistics(TaskCompletionReport report) {
- this.progress = 1.0f;
-
- List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-
- if (report.getShuffleFileOutputsCount() > 0) {
- this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
-
- PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
- for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
- IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
- getId().getId(), p.getPartId(), host, p.getVolume());
- partitions.add(entry);
- }
- }
- this.getQueryUnit().setIntermediateData(partitions);
-
- if (report.hasInputStats()) {
- this.inputStats = report.getInputStats();
- }
- if (report.hasResultStats()) {
- this.resultStats = report.getResultStats();
- this.getQueryUnit().setStats(new TableStats(resultStats));
- }
- }
-
- private static class TaskAttemptScheduleTransition implements
- SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
- taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
- EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
- taskAttempt.scheduleContext, taskAttempt));
- }
- }
-
- private static class KillUnassignedTaskTransition implements
- SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
- taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
- EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
- taskAttempt.scheduleContext, taskAttempt));
- }
- }
-
- private static class LaunchTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
- taskAttempt.containerId = castEvent.getContainerId();
- taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
- taskAttempt.eventHandler.handle(
- new TaskTAttemptEvent(taskAttempt.getId(),
- TaskEventType.T_ATTEMPT_LAUNCHED));
- }
- }
-
- private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
- TaskEventType.T_ATTEMPT_KILLED));
- LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
- }
- }
-
- private static class StatusUpdateTransition
- implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
-
- @Override
- public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
-
- taskAttempt.progress = updateEvent.getStatus().getProgress();
- taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
- taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
-
- return TaskAttemptState.TA_RUNNING;
- }
- }
-
- private void addDiagnosticInfo(String diag) {
- if (diag != null && !diag.equals("")) {
- diagnostics.add(diag);
- }
- }
-
- private static class AlreadyAssignedTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(QueryUnitAttempt queryUnitAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- }
- }
-
- private static class AlreadyDoneTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(QueryUnitAttempt queryUnitAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- }
- }
-
- private static class SucceededTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
- try {
- taskAttempt.fillTaskStatistics(report);
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
- } catch (Throwable t) {
- taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
- taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
- }
- }
- }
-
- private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
- taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
- LocalTaskEventType.KILL));
- }
- }
-
- private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
- @Override
- public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
- TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
- taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
- + " >> " + errorEvent.errorMessage());
- }
- }
-
- @Override
- public void handle(TaskAttemptEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
- }
- try {
- writeLock.lock();
- TaskAttemptState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getState().name()
- , e);
- eventHandler.handle(
- new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
- "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
- eventHandler.handle(
- new SubQueryEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
- SubQueryEventType.SQ_INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index a240ace..cf6b917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -44,7 +44,7 @@ import org.apache.tajo.exception.InternalException;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.master.TaskSchedulerContext;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.PlanningException;
@@ -717,7 +717,7 @@ public class Repartitioner {
List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
for (ExecutionBlock childBlock : childBlocks) {
SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
- for (QueryUnit qu : childExecSM.getQueryUnits()) {
+ for (Task qu : childExecSM.getTasks()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
fetch.addPart(p.getTaskId(), p.getAttemptId());
@@ -840,8 +840,8 @@ public class Repartitioner {
// make FetchImpl per PullServer, PartId
Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
- Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
- for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+ Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
+ for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
block.getId(), interm.getKey(), e.getValue());
@@ -1191,10 +1191,10 @@ public class Repartitioner {
return hashed;
}
- public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
- Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>();
+ public static Map<Task.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
+ Map<Task.PullHost, List<IntermediateEntry>> hashed = new HashMap<Task.PullHost, List<IntermediateEntry>>();
- QueryUnit.PullHost host;
+ Task.PullHost host;
for (IntermediateEntry entry : entries) {
host = entry.getPullHost();
if (hashed.containsKey(host)) {