You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/06/08 09:53:15 UTC
[1/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master dfcf41d6f -> 36da0dac7
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index 5c97ba8..16d32d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -36,6 +36,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
/**
* The history class for TaskRunner processing.
*/
+@Deprecated
public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
private Service.STATE state;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 734a8a5..d18a262 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -36,6 +36,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+@Deprecated
public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> {
private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
@@ -154,14 +155,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
if(context == null){
try {
- context = new ExecutionBlockContext(getTajoConf(),
- getWorkerContext(),
- this,
- startEvent.getQueryContext(),
- startEvent.getPlan(),
- startEvent.getExecutionBlockId(),
- startEvent.getQueryMaster(),
- startEvent.getShuffleType());
+ context = new ExecutionBlockContext(getWorkerContext(), this, startEvent.getRequest());
context.init();
} catch (Throwable e) {
LOG.fatal(e.getMessage(), e);
@@ -170,7 +164,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
executionBlockContextMap.put(event.getExecutionBlockId(), context);
}
- TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId());
+ TaskRunner taskRunner = new TaskRunner(context, startEvent.getRequest().getContainerId());
LOG.info("Start TaskRunner:" + taskRunner.getId());
taskRunnerMap.put(taskRunner.getId(), taskRunner);
taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
new file mode 100644
index 0000000..85d74e2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStartEvent extends TaskManagerEvent {
+ private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto;
+
+ public ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto) {
+ super(EventType.EB_START, new ExecutionBlockId(requestProto.getExecutionBlockId()));
+ this.requestProto = requestProto;
+ }
+
+ public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() {
+ return requestProto;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
new file mode 100644
index 0000000..2b967ab
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStopEvent extends TaskManagerEvent {
+ private TajoWorkerProtocol.ExecutionBlockListProto cleanupList;
+
+ public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto executionBlockId,
+ TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+ super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId));
+ this.cleanupList = cleanupList;
+ }
+
+ public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() {
+ return cleanupList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
index 2f411e8..9a1c106 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -24,7 +24,7 @@ import com.google.protobuf.RpcCallback;
import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
-public class NodeResourceAllocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceAllocateEvent extends NodeResourceEvent {
private BatchAllocationRequestProto request;
private RpcCallback<BatchAllocationResponseProto> callback;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
index a298d77..31d9229 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.worker.event;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.resource.NodeResource;
-public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceDeallocateEvent extends NodeResourceEvent {
private NodeResource resource;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
new file mode 100644
index 0000000..6fd2e0d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NodeResourceEvent extends AbstractEvent<NodeResourceEvent.EventType> {
+ //consumer: NodeResourceManager
+ public enum EventType {
+ // producer: TajoWorkerManagerService
+ ALLOCATE,
+ // producer: TaskExecutor
+ DEALLOCATE
+ }
+
+ public NodeResourceEvent(EventType eventType) {
+ super(eventType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
deleted file mode 100644
index bcb3448..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.resource.NodeResource;
-
-public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> {
- public enum EventType {
- ALLOCATE,
- DEALLOCATE
- }
-
- public NodeResourceManagerEvent(EventType eventType) {
- super(eventType);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
index 58ab74a..9eb8ae9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
@@ -22,19 +22,16 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.resource.NodeResource;
public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> {
- private final NodeResource resource;
+ // consumer: NodeStatusUpdater
public enum EventType {
+ // producer: NodeResourceManager
REPORT_RESOURCE,
+ // producer: TaskManager
FLUSH_REPORTS
}
- public NodeStatusEvent(EventType eventType, NodeResource resource) {
+ public NodeStatusEvent(EventType eventType) {
super(eventType);
- this.resource = resource;
- }
-
- public NodeResource getResource() {
- return resource;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
new file mode 100644
index 0000000..c609c67
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskExecutorEvent extends AbstractEvent<TaskExecutorEvent.EventType> {
+
+ // producer: NodeResourceManager, consumer: TaskExecutorEvent
+ public enum EventType {
+ START,
+ KILL,
+ ABORT
+ }
+
+ private TaskAttemptId taskAttemptId;
+
+ public TaskExecutorEvent(EventType eventType,
+ TaskAttemptId taskAttemptId) {
+ super(eventType);
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskAttemptId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
new file mode 100644
index 0000000..39b541b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskManagerEvent extends AbstractEvent<TaskManagerEvent.EventType> {
+ // producer: NodeResourceManager, consumer: TaskManager
+ public enum EventType {
+ EB_START,
+ EB_STOP
+ }
+
+ private ExecutionBlockId executionBlockId;
+
+ public TaskManagerEvent(EventType eventType,
+ ExecutionBlockId executionBlockId) {
+ super(eventType);
+ this.executionBlockId = executionBlockId;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
index aac8973..7175251 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.ExecutionBlockId;
+@Deprecated
public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
public enum EventType {
START,
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 908afa2..9406794 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,48 +20,20 @@ package org.apache.tajo.worker.event;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
-
+@Deprecated
public class TaskRunnerStartEvent extends TaskRunnerEvent {
- private final QueryContext queryContext;
- private final WorkerConnectionInfo queryMaster;
- private final String containerId;
- private final String plan;
- private final PlanProto.ShuffleType shuffleType;
-
- public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
- ExecutionBlockId executionBlockId,
- String containerId,
- QueryContext context,
- String plan,
- PlanProto.ShuffleType shuffleType) {
- super(EventType.START, executionBlockId);
- this.queryMaster = queryMaster;
- this.containerId = containerId;
- this.queryContext = context;
- this.plan = plan;
- this.shuffleType = shuffleType;
- }
-
- public WorkerConnectionInfo getQueryMaster() {
- return queryMaster;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public QueryContext getQueryContext() {
- return queryContext;
- }
+ private final TajoWorkerProtocol.RunExecutionBlockRequestProto request;
- public String getPlan() {
- return plan;
+ public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+ super(EventType.START, new ExecutionBlockId(request.getExecutionBlockId()));
+ this.request = request;
}
- public PlanProto.ShuffleType getShuffleType() {
- return shuffleType;
+ public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() {
+ return request;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
index c8ec20d..297f30c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker.event;
import org.apache.tajo.ExecutionBlockId;
+@Deprecated
public class TaskRunnerStopEvent extends TaskRunnerEvent {
public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
new file mode 100644
index 0000000..f60e7c4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.resource.NodeResource;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
+
+public class TaskStartEvent extends TaskExecutorEvent {
+
+ private NodeResource allocatedResource;
+ private TaskRequestProto taskRequest;
+
+ public TaskStartEvent(TaskRequestProto taskRequest,
+ NodeResource allocatedResource) {
+ super(EventType.START, new TaskAttemptId(taskRequest.getId()));
+ this.taskRequest = taskRequest;
+ this.allocatedResource = allocatedResource;
+ }
+
+ public NodeResource getAllocatedResource() {
+ return allocatedResource;
+ }
+
+ public TaskRequestProto getTaskRequest() {
+ return taskRequest;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 2324596..715b1e6 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -208,6 +208,7 @@ message TaskAllocationRequestProto {
message BatchAllocationRequestProto {
required ExecutionBlockIdProto executionBlockId = 1;
repeated TaskAllocationRequestProto taskRequest = 2;
+ optional RunExecutionBlockRequestProto executionBlockRequest = 3; //TODO should be refactored
}
message BatchAllocationResponseProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index eca7f6d..0cec3da 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,6 +19,7 @@
package org.apache.tajo.querymaster;
import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tajo.*;
@@ -33,16 +34,25 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.session.Session;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.worker.ExecutionBlockContext;
-import org.apache.tajo.worker.Task;
+import org.apache.tajo.worker.LegacyTaskImpl;
+import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.worker.TaskRunnerManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -230,7 +240,7 @@ public class TestKillQuery {
QueryId qid = LocalTajoTestingUtility.newQueryId();
ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
TaskId tid = QueryIdFactory.newTaskId(eid);
- TajoConf conf = new TajoConf();
+ final TajoConf conf = new TajoConf();
TaskRequestImpl taskRequest = new TaskRequestImpl();
taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
@@ -238,18 +248,37 @@ public class TestKillQuery {
taskRequest.setInterQuery();
TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
+ WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+ requestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+
+ requestProto.setExecutionBlockId(eid.getProto())
+ .setQueryMaster(queryMaster.getProto())
+ .setNodeId(queryMaster.getHost()+":" + queryMaster.getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+ };
+
ExecutionBlockContext context =
- new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null, null);
+ new ExecutionBlockContext(workerContext, null, requestProto.build());
- org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId,
+ org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", CommonTestingUtil.getTestDir(), attemptId,
conf, context, taskRequest);
task.kill();
- assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
try {
task.run();
- assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
} catch (Exception e) {
- assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
}
}
@@ -271,4 +300,94 @@ public class TestKillQuery {
super.dispatch(event);
}
}
+
+ abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+
+ @Override
+ public QueryMaster getQueryMaster() {
+ return null;
+ }
+
+ public abstract TajoConf getConf();
+
+ @Override
+ public ServiceTracker getServiceTracker() {
+ return null;
+ }
+
+ @Override
+ public QueryMasterManagerService getQueryMasterManagerService() {
+ return null;
+ }
+
+ @Override
+ public TaskRunnerManager getTaskRunnerManager() {
+ return null;
+ }
+
+ @Override
+ public CatalogService getCatalog() {
+ return null;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ return null;
+ }
+
+ @Override
+ public String getWorkerName() {
+ return null;
+ }
+
+ @Override
+ public LocalDirAllocator getLocalDirAllocator() {
+ return null;
+ }
+
+ @Override
+ public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
+ return null;
+ }
+
+ @Override
+ public TajoSystemMetrics getWorkerSystemMetrics() {
+ return null;
+ }
+
+ @Override
+ public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+ return null;
+ }
+
+ @Override
+ public HistoryWriter getTaskHistoryWriter() {
+ return null;
+ }
+
+ @Override
+ public HistoryReader getHistoryReader() {
+ return null;
+ }
+
+ @Override
+ public void cleanup(String strPath) {
+
+ }
+
+ @Override
+ public void cleanupTemporalDirectories() {
+
+ }
+
+ @Override
+ public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) {
+
+ }
+
+ @Override
+ public void setNumClusterNodes(int numClusterNodes) {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
new file mode 100644
index 0000000..9d4e1f3
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+import java.io.IOException;
+
+public class MockExecutionBlock extends ExecutionBlockContext {
+
+ public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
+ TajoWorkerProtocol.RunExecutionBlockRequestProto request) throws IOException {
+ super(workerContext, null, request);
+ }
+
+ @Override
+ public void init() throws Throwable {
+ //skip
+ }
+
+ @Override
+ public void fatalError(TaskAttemptId taskAttemptId, String message) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
new file mode 100644
index 0000000..18b9405
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+public class MockNodeResourceManager extends NodeResourceManager {
+
+ volatile boolean enableTaskHandlerEvent = true;
+ private final Semaphore barrier;
+
+ public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, EventHandler taskEventHandler) {
+ super(dispatcher, taskEventHandler);
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void handle(NodeResourceEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+
+ @Override
+ protected void startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+ if(enableTaskHandlerEvent) {
+ super.startExecutionBlock(request);
+ }
+ }
+
+ @Override
+ protected void startTask(TajoWorkerProtocol.TaskRequestProto request, NodeResource resource) {
+ if(enableTaskHandlerEvent) {
+ super.startTask(request, resource);
+ }
+ }
+
+ /**
+ * skip task execution and deallocation for testing
+ * */
+ public void setTaskHandlerEvent(boolean flag) {
+ enableTaskHandlerEvent = flag;
+ }
+
+ protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> createTaskRequests(
+ ExecutionBlockId ebId, int memory, int size) {
+
+ Queue<TajoWorkerProtocol.TaskAllocationRequestProto>
+ requestProtoList = new LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>();
+ for (int i = 0; i < size; i++) {
+
+ TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
+ TajoWorkerProtocol.TaskRequestProto.Builder builder =
+ TajoWorkerProtocol.TaskRequestProto.newBuilder();
+ builder.setId(taskAttemptId.getProto());
+ builder.setShouldDie(true);
+ builder.setOutputTable("");
+ builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+ builder.setClusteredOutput(false);
+
+
+ requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder()
+ .setResource(NodeResources.createResource(memory).getProto())
+ .setTaskRequest(builder.build()).build());
+ }
+ return requestProtoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
index 2d7d0be..dfcfd4f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -39,9 +39,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater {
private Map<Integer, NodeResource> resources = Maps.newHashMap();
private MockResourceTracker resourceTracker;
- public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo,
+ public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext,
NodeResourceManager resourceManager) {
- super(connectionInfo, resourceManager);
+ super(workerContext, resourceManager);
this.barrier = barrier;
this.resourceTracker = new MockResourceTracker();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
new file mode 100644
index 0000000..f62733f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.event.TaskExecutorEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskExecutor extends TaskExecutor {
+
+ protected final Semaphore barrier;
+
+ public MockTaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
+ super(taskManager, rmEventHandler);
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void handle(TaskExecutorEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+
+ @Override
+ protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+ final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+
+ //ignore status changed log
+ final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) {
+ private TajoProtos.TaskAttemptState state;
+
+ @Override
+ public TajoProtos.TaskAttemptState getState() {
+ return state;
+ }
+
+ @Override
+ public void setState(TajoProtos.TaskAttemptState state) {
+ this.state = state;
+ }
+ };
+
+ return new Task() {
+ @Override
+ public void init() throws IOException {
+
+ }
+
+ @Override
+ public void fetch() {
+
+ }
+
+ @Override
+ public void run() throws Exception {
+ taskAttemptContext.stop();
+ taskAttemptContext.setProgress(1.0f);
+ taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+ }
+
+ @Override
+ public void kill() {
+
+ }
+
+ @Override
+ public void abort() {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public boolean hasFetchPhase() {
+ return false;
+ }
+
+ @Override
+ public boolean isProgressChanged() {
+ return false;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return taskAttemptContext.isStopped();
+ }
+
+ @Override
+ public void updateProgress() {
+
+ }
+
+ @Override
+ public TaskAttemptContext getTaskContext() {
+ return taskAttemptContext;
+ }
+
+ @Override
+ public ExecutionBlockContext getExecutionBlockContext() {
+ return context;
+ }
+
+ @Override
+ public TajoWorkerProtocol.TaskStatusProto getReport() {
+ TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+ builder.setWorkerName("localhost:0");
+ builder.setId(taskAttemptContext.getTaskId().getProto())
+ .setProgress(taskAttemptContext.getProgress())
+ .setState(taskAttemptContext.getState());
+
+ builder.setInputStats(new TableStats().getProto());
+ return builder.build();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
new file mode 100644
index 0000000..678b063
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.worker.event.TaskManagerEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskManager extends TaskManager {
+
+ private final Semaphore barrier;
+
+ public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+ super(dispatcher, workerContext, rmEventHandler);
+ this.barrier = barrier;
+ }
+
+ @Override
+ protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+ try {
+ return new MockExecutionBlock(getWorkerContext(), request);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void stopExecutionBlock(ExecutionBlockContext context,
+ TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+ //skip for testing
+ }
+
+ @Override
+ public void handle(TaskManagerEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
new file mode 100644
index 0000000..e8c2b9c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+
+public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+ TajoSystemMetrics tajoSystemMetrics;
+
+ @Override
+ public QueryMaster getQueryMaster() {
+ return null;
+ }
+
+ public abstract TajoConf getConf();
+
+ @Override
+ public ServiceTracker getServiceTracker() {
+ return null;
+ }
+
+ @Override
+ public QueryMasterManagerService getQueryMasterManagerService() {
+ return null;
+ }
+
+ @Override
+ public TaskRunnerManager getTaskRunnerManager() {
+ return null;
+ }
+
+ @Override
+ public CatalogService getCatalog() {
+ return null;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ return null;
+ }
+
+ @Override
+ public String getWorkerName() {
+ return null;
+ }
+
+ @Override
+ public LocalDirAllocator getLocalDirAllocator() {
+ return null;
+ }
+
+ @Override
+ public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
+ return null;
+ }
+
+ @Override
+ public TajoSystemMetrics getWorkerSystemMetrics() {
+
+ if (tajoSystemMetrics == null) {
+ tajoSystemMetrics = new TajoSystemMetrics(getConf(), "test-file-group", "localhost");
+ tajoSystemMetrics.start();
+ }
+ return tajoSystemMetrics;
+ }
+
+ @Override
+ public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+ return null;
+ }
+
+ @Override
+ public HistoryWriter getTaskHistoryWriter() {
+ return null;
+ }
+
+ @Override
+ public HistoryReader getHistoryReader() {
+ return null;
+ }
+
+ @Override
+ public void cleanup(String strPath) {
+
+ }
+
+ @Override
+ public void cleanupTemporalDirectories() {
+
+ }
+
+ @Override
+ public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) {
+
+ }
+
+ @Override
+ public void setNumClusterNodes(int numClusterNodes) {
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 513eb69..65627c1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -104,13 +104,13 @@ public class TestFetcher {
@Test
public void testAdjustFetchProcess() {
- assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
- assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
- assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
- assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
- assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);
- assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0);
- assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0);
+ assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(0, 0), 0);
+ assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(10, 10), 0);
+ assertEquals(0.05f, LegacyTaskImpl.adjustFetchProcess(10, 9), 0);
+ assertEquals(0.1f, LegacyTaskImpl.adjustFetchProcess(10, 8), 0);
+ assertEquals(0.25f, LegacyTaskImpl.adjustFetchProcess(10, 5), 0);
+ assertEquals(0.45f, LegacyTaskImpl.adjustFetchProcess(10, 1), 0);
+ assertEquals(0.5f, LegacyTaskImpl.adjustFetchProcess(10, 0), 0);
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 7407acc..2cee7d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -19,13 +19,15 @@
package org.apache.tajo.worker;
import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.*;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.resource.NodeResources;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
@@ -42,9 +44,15 @@ import static org.junit.Assert.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
public class TestNodeResourceManager {
- private NodeResourceManager resourceManager;
- private MockNodeStatusUpdater statusUpdater;
+ private MockNodeResourceManager resourceManager;
+ private NodeStatusUpdater statusUpdater;
+ private TaskManager taskManager;
+ private TaskExecutor taskExecutor;
private AsyncDispatcher dispatcher;
+ private AsyncDispatcher taskDispatcher;
+ private TajoWorker.WorkerContext workerContext;
+
+ private CompositeService service;
private int taskMemory;
private TajoConf conf;
@@ -61,29 +69,55 @@ public class TestNodeResourceManager {
conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
dispatcher = new AsyncDispatcher();
- dispatcher.init(conf);
- dispatcher.start();
-
- resourceManager = new NodeResourceManager(dispatcher);
- resourceManager.init(conf);
- resourceManager.start();
-
- WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
- statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager);
- statusUpdater.init(conf);
- statusUpdater.start();
+ taskDispatcher = new AsyncDispatcher();
+
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
+
+ taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
+ taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
+ resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, taskDispatcher.getEventHandler());
+ statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+ service = new CompositeService("MockService") {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addIfService(dispatcher);
+ addIfService(taskDispatcher);
+ addIfService(taskManager);
+ addIfService(taskExecutor);
+ addIfService(resourceManager);
+ addIfService(statusUpdater);
+ super.serviceInit(conf);
+ }
+ };
+
+ service.init(conf);
+ service.start();
}
@After
public void tearDown() {
- resourceManager.stop();
- statusUpdater.stop();
- dispatcher.stop();
+ service.stop();
}
@Test
public void testNodeResourceAllocateEvent() throws Exception {
int requestSize = 4;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -91,14 +125,14 @@ public class TestNodeResourceManager {
requestProto.setExecutionBlockId(ebId.getProto());
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
- requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
BatchAllocationResponseProto responseProto = callFuture.get();
assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ // allocated all
assertEquals(0, responseProto.getCancellationTaskCount());
- assertEquals(requestSize, resourceManager.getAllocatedSize());
}
@@ -106,6 +140,7 @@ public class TestNodeResourceManager {
public void testNodeResourceCancellation() throws Exception {
int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
int overSize = 10;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -113,18 +148,19 @@ public class TestNodeResourceManager {
requestProto.setExecutionBlockId(ebId.getProto());
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
- requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize));
+ requestProto.addAllTaskRequest(
+ MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize));
dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
BatchAllocationResponseProto responseProto = callFuture.get();
assertEquals(overSize, responseProto.getCancellationTaskCount());
- assertEquals(requestSize, resourceManager.getAllocatedSize());
}
@Test
public void testNodeResourceDeallocateEvent() throws Exception {
int requestSize = 4;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -132,21 +168,20 @@ public class TestNodeResourceManager {
requestProto.setExecutionBlockId(ebId.getProto());
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
- requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
BatchAllocationResponseProto responseProto = callFuture.get();
assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
assertEquals(0, responseProto.getCancellationTaskCount());
- assertEquals(requestSize, resourceManager.getAllocatedSize());
//deallocate
for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) {
// direct invoke handler for testing
resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
}
- assertEquals(0, resourceManager.getAllocatedSize());
+
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
}
@@ -154,12 +189,38 @@ public class TestNodeResourceManager {
public void testParallelRequest() throws Exception {
final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
final int taskSize = 100000;
+ resourceManager.setTaskHandlerEvent(true);
+
final AtomicInteger totalComplete = new AtomicInteger();
final AtomicInteger totalCanceled = new AtomicInteger();
final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
- final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize);
+ final Queue<TaskAllocationRequestProto>
+ totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize);
+
+ // first request with starting ExecutionBlock
+ TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+ ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+ ebRequestProto.setExecutionBlockId(ebId.getProto())
+ .setQueryMaster(workerContext.getConnectionInfo().getProto())
+ .setNodeId(workerContext.getConnectionInfo().getHost() + ":" +
+ workerContext.getConnectionInfo().getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ TaskAllocationRequestProto task = totalTasks.poll();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ requestProto.addTaskRequest(task);
+ requestProto.setExecutionBlockId(ebId.getProto());
+ requestProto.setExecutionBlockRequest(ebRequestProto.build());
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+ totalComplete.incrementAndGet();
+ // start parallel request
ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
List<Future> futureList = Lists.newArrayList();
@@ -187,7 +248,6 @@ public class TestNodeResourceManager {
totalCanceled.addAndGet(proto.getCancellationTaskCount());
} else {
complete++;
- dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource()));
}
} catch (Exception e) {
fail(e.getMessage());
@@ -209,27 +269,4 @@ public class TestNodeResourceManager {
executor.shutdown();
assertEquals(taskSize, totalComplete.get());
}
-
- protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) {
- Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>();
- for (int i = 0; i < size; i++) {
-
- ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
- TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0);
-
- TajoWorkerProtocol.TaskRequestProto.Builder builder =
- TajoWorkerProtocol.TaskRequestProto.newBuilder();
- builder.setId(taskAttemptId.getProto());
- builder.setShouldDie(true);
- builder.setOutputTable("");
- builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
- builder.setClusteredOutput(false);
-
-
- requestProtoList.add(TaskAllocationRequestProto.newBuilder()
- .setResource(NodeResources.createResource(memory).getProto())
- .setTaskRequest(builder.build()).build());
- }
- return requestProtoList;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
index fb3c77e..af40554 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.event.NodeStatusEvent;
import org.junit.After;
@@ -37,18 +38,36 @@ public class TestNodeStatusUpdater {
private MockNodeStatusUpdater statusUpdater;
private AsyncDispatcher dispatcher;
private TajoConf conf;
+ private TajoWorker.WorkerContext workerContext;
+
@Before
public void setup() {
conf = new TajoConf();
conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
- resourceManager = new NodeResourceManager(dispatcher);
+ resourceManager = new NodeResourceManager(dispatcher, null);
resourceManager.init(conf);
resourceManager.start();
}
@@ -63,27 +82,25 @@ public class TestNodeStatusUpdater {
@Test(timeout = 20000)
public void testNodeMembership() throws Exception {
CountDownLatch barrier = new CountDownLatch(1);
- WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
- statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
statusUpdater.init(conf);
statusUpdater.start();
MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
barrier.await();
- assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId()));
+ assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId()));
assertEquals(resourceManager.getTotalResource(),
- resourceTracker.getTotalResource().get(worker.getId()));
+ resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId()));
assertEquals(resourceManager.getAvailableResource(),
- resourceTracker.getAvailableResource().get(worker.getId()));
+ resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId()));
}
@Test(timeout = 20000)
public void testPing() throws Exception {
CountDownLatch barrier = new CountDownLatch(2);
- WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
- statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
statusUpdater.init(conf);
statusUpdater.start();
@@ -100,16 +117,29 @@ public class TestNodeStatusUpdater {
@Test(timeout = 20000)
public void testResourceReport() throws Exception {
CountDownLatch barrier = new CountDownLatch(2);
- WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
- statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
statusUpdater.init(conf);
statusUpdater.start();
+ assertEquals(0, statusUpdater.getQueueSize());
for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
- dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE,
- resourceManager.getAvailableResource()));
+ dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
}
barrier.await();
assertEquals(0, statusUpdater.getQueueSize());
}
+
+ @Test(timeout = 20000)
+ public void testFlushResourceReport() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ assertEquals(0, statusUpdater.getQueueSize());
+ dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+ barrier.await();
+ assertEquals(0, statusUpdater.getQueueSize());
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
new file mode 100644
index 0000000..98b187b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.*;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskExecutor {
+
+ private NodeResourceManager resourceManager;
+ private NodeStatusUpdater statusUpdater;
+ private TaskManager taskManager;
+ private TaskExecutor taskExecutor;
+ private AsyncDispatcher dispatcher;
+ private AsyncDispatcher taskDispatcher;
+ private TajoWorker.WorkerContext workerContext;
+
+ private CompositeService service;
+ private TajoConf conf;
+ private Semaphore barrier;
+ private Semaphore resourceManagerBarrier;
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+ dispatcher = new AsyncDispatcher();
+ taskDispatcher = new AsyncDispatcher();
+
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
+
+ barrier = new Semaphore(0);
+ resourceManagerBarrier = new Semaphore(0);
+ taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
+ taskExecutor = new TaskExecutor(barrier, taskManager, dispatcher.getEventHandler());
+ resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, taskDispatcher.getEventHandler());
+ statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+ service = new CompositeService("MockService") {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addIfService(dispatcher);
+ addIfService(taskDispatcher);
+ addIfService(taskManager);
+ addIfService(taskExecutor);
+ addIfService(resourceManager);
+ addIfService(statusUpdater);
+ super.serviceInit(conf);
+ }
+
+
+ @Override
+ protected void serviceStop() throws Exception {
+ workerContext.getWorkerSystemMetrics().stop();
+ super.serviceStop();
+ }
+ };
+
+ service.init(conf);
+ service.start();
+ }
+
+ @After
+ public void tearDown() {
+ service.stop();
+ }
+
+ @Test
+ public void testTaskRequest() throws Exception {
+ int requestSize = 1;
+
+ RunExecutionBlockRequestProto.Builder
+ ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+ ebRequestProto.setExecutionBlockId(ebId.getProto())
+ .setQueryMaster(workerContext.getConnectionInfo().getProto())
+ .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+ + workerContext.getConnectionInfo().getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ requestProto.setExecutionBlockId(ebId.getProto());
+ requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ //verify running task
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertEquals(1, taskExecutor.getRunningTasks());
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertEquals(0, taskExecutor.getRunningTasks());
+ assertEquals(1, taskExecutor.completeTasks);
+
+ //verify the released resources
+ Thread.sleep(100);
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ }
+
+ @Test
+ public void testTaskException() throws Exception {
+ int requestSize = 1;
+
+ RunExecutionBlockRequestProto.Builder
+ ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+ ebRequestProto.setExecutionBlockId(ebId.getProto())
+ .setQueryMaster(workerContext.getConnectionInfo().getProto())
+ .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+ + workerContext.getConnectionInfo().getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ requestProto.setExecutionBlockId(ebId.getProto());
+ requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
+
+ taskExecutor.throwException.set(true);
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ //verify running task
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertEquals(1, taskExecutor.getRunningTasks());
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertEquals(0, taskExecutor.getRunningTasks());
+ assertEquals(0, taskExecutor.completeTasks);
+
+ //verify the released resources
+ Thread.sleep(100);
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ }
+
+ class TaskExecutor extends MockTaskExecutor {
+ int completeTasks;
+ AtomicBoolean throwException = new AtomicBoolean();
+
+ public TaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
+ super(barrier, taskManager, rmEventHandler);
+ }
+
+ @Override
+ protected void stopTask(TaskAttemptId taskId) {
+ super.stopTask(taskId);
+ super.barrier.release();
+ }
+
+ @Override
+ protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+ final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+ final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null);
+
+ return new Task() {
+ @Override
+ public void init() throws IOException {
+
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void fetch() {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ Thread.sleep(50);
+
+ if(throwException.get()) throw new RuntimeException();
+
+ taskAttemptContext.stop();
+ taskAttemptContext.setProgress(1.0f);
+ taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+ completeTasks++;
+ }
+
+ @Override
+ public void kill() {
+
+ }
+
+ @Override
+ public void abort() {
+
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public boolean hasFetchPhase() {
+ return false;
+ }
+
+ @Override
+ public boolean isProgressChanged() {
+ return false;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return taskAttemptContext.isStopped();
+ }
+
+ @Override
+ public void updateProgress() {
+
+ }
+
+ @Override
+ public TaskAttemptContext getTaskContext() {
+ return taskAttemptContext;
+ }
+
+ @Override
+ public ExecutionBlockContext getExecutionBlockContext() {
+ return context;
+ }
+
+ @Override
+ public TajoWorkerProtocol.TaskStatusProto getReport() {
+ TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+ builder.setWorkerName("localhost:0");
+ builder.setId(taskAttemptContext.getTaskId().getProto())
+ .setProgress(taskAttemptContext.getProgress())
+ .setState(taskAttemptContext.getState());
+
+ builder.setInputStats(new TableStats().getProto());
+ return builder.build();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
new file mode 100644
index 0000000..8bca489
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskManager {
+
+ private NodeResourceManager resourceManager;
+ private NodeStatusUpdater statusUpdater;
+ private TaskManager taskManager;
+ private TaskExecutor taskExecutor;
+ private AsyncDispatcher dispatcher;
+ private AsyncDispatcher taskDispatcher;
+ private TajoWorker.WorkerContext workerContext;
+
+ private CompositeService service;
+ private int taskMemory;
+ private TajoConf conf;
+ private Semaphore barrier;
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+ taskMemory = 512;
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+ taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+
+ dispatcher = new AsyncDispatcher();
+ taskDispatcher = new AsyncDispatcher();
+
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
+ barrier = new Semaphore(0);
+ taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext, dispatcher.getEventHandler());
+ taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
+ resourceManager = new NodeResourceManager(dispatcher, taskDispatcher.getEventHandler());
+ statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+ service = new CompositeService("MockService") {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addIfService(dispatcher);
+ addIfService(taskDispatcher);
+ addIfService(taskManager);
+ addIfService(taskExecutor);
+ addIfService(resourceManager);
+ addIfService(statusUpdater);
+ super.serviceInit(conf);
+ }
+
+
+ @Override
+ protected void serviceStop() throws Exception {
+ workerContext.getWorkerSystemMetrics().stop();
+ super.serviceStop();
+ }
+ };
+
+ service.init(conf);
+ service.start();
+ }
+
+ @After
+ public void tearDown() {
+ service.stop();
+ }
+
+ @Test(timeout = 10000)
+ public void testExecutionBlockStart() throws Exception {
+ int requestSize = 1;
+
+ TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+ ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+ ebRequestProto.setExecutionBlockId(ebId.getProto())
+ .setQueryMaster(workerContext.getConnectionInfo().getProto())
+ .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+ + workerContext.getConnectionInfo().getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ requestProto.setExecutionBlockId(ebId.getProto());
+ requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertNotNull(taskManager.getExecutionBlockContext(ebId));
+ assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+ }
+
+ @Test(timeout = 10000)
+ public void testExecutionBlockStop() throws Exception {
+
+ TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+ ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+ ebRequestProto.setExecutionBlockId(ebId.getProto())
+ .setQueryMaster(workerContext.getConnectionInfo().getProto())
+ .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+ + workerContext.getConnectionInfo().getQueryMasterPort())
+ .setContainerId("test")
+ .setQueryContext(new QueryContext(conf).getProto())
+ .setPlanJson("test")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+ taskDispatcher.getEventHandler().handle(new ExecutionBlockStartEvent(ebRequestProto.build()));
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertNotNull(taskManager.getExecutionBlockContext(ebId));
+ assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+
+ ExecutionBlockListProto.Builder ebList = ExecutionBlockListProto.newBuilder();
+ taskDispatcher.getEventHandler().handle(new ExecutionBlockStopEvent(ebId.getProto(), ebList.build()));
+ assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+ assertNull(taskManager.getExecutionBlockContext(ebId));
+ }
+}
[3/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)
Posted by jh...@apache.org.
TAJO-1615: Implement TaskManager. (jinho)
Closes #595
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36da0dac
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36da0dac
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36da0dac
Branch: refs/heads/master
Commit: 36da0dac7d90fa12b3d9cf11fa5b561a586e60ff
Parents: dfcf41d
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Jun 8 16:52:05 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Jun 8 16:52:05 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../physical/HashShuffleFileWriteExec.java | 2 +-
.../engine/planner/physical/PhysicalExec.java | 2 +-
.../tajo/master/rm/TajoResourceTracker.java | 4 +-
.../tajo/util/TajoUncaughtExceptionHandler.java | 70 ++
.../apache/tajo/util/history/HistoryWriter.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 83 +-
.../org/apache/tajo/worker/LegacyTaskImpl.java | 844 +++++++++++++++++++
.../apache/tajo/worker/NodeResourceManager.java | 45 +-
.../apache/tajo/worker/NodeStatusUpdater.java | 34 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 47 +-
.../tajo/worker/TajoWorkerManagerService.java | 9 +-
.../main/java/org/apache/tajo/worker/Task.java | 834 +-----------------
.../apache/tajo/worker/TaskAttemptContext.java | 61 +-
.../org/apache/tajo/worker/TaskContainer.java | 85 ++
.../org/apache/tajo/worker/TaskExecutor.java | 194 +++++
.../java/org/apache/tajo/worker/TaskImpl.java | 838 ++++++++++++++++++
.../org/apache/tajo/worker/TaskManager.java | 180 ++++
.../java/org/apache/tajo/worker/TaskRunner.java | 10 +-
.../apache/tajo/worker/TaskRunnerHistory.java | 1 +
.../apache/tajo/worker/TaskRunnerManager.java | 12 +-
.../worker/event/ExecutionBlockStartEvent.java | 35 +
.../worker/event/ExecutionBlockStopEvent.java | 37 +
.../worker/event/NodeResourceAllocateEvent.java | 2 +-
.../event/NodeResourceDeallocateEvent.java | 2 +-
.../tajo/worker/event/NodeResourceEvent.java | 35 +
.../worker/event/NodeResourceManagerEvent.java | 34 -
.../tajo/worker/event/NodeStatusEvent.java | 11 +-
.../tajo/worker/event/TaskExecutorEvent.java | 44 +
.../tajo/worker/event/TaskManagerEvent.java | 43 +
.../tajo/worker/event/TaskRunnerEvent.java | 1 +
.../tajo/worker/event/TaskRunnerStartEvent.java | 44 +-
.../tajo/worker/event/TaskRunnerStopEvent.java | 1 +
.../tajo/worker/event/TaskStartEvent.java | 44 +
.../src/main/proto/TajoWorkerProtocol.proto | 1 +
.../apache/tajo/querymaster/TestKillQuery.java | 135 ++-
.../apache/tajo/worker/MockExecutionBlock.java | 42 +
.../tajo/worker/MockNodeResourceManager.java | 96 +++
.../tajo/worker/MockNodeStatusUpdater.java | 4 +-
.../apache/tajo/worker/MockTaskExecutor.java | 141 ++++
.../org/apache/tajo/worker/MockTaskManager.java | 59 ++
.../apache/tajo/worker/MockWorkerContext.java | 129 +++
.../org/apache/tajo/worker/TestFetcher.java | 14 +-
.../tajo/worker/TestNodeResourceManager.java | 135 +--
.../tajo/worker/TestNodeStatusUpdater.java | 54 +-
.../apache/tajo/worker/TestTaskExecutor.java | 330 ++++++++
.../org/apache/tajo/worker/TestTaskManager.java | 185 ++++
47 files changed, 3904 insertions(+), 1113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 44a8939..066e086 100644
--- a/CHANGES
+++ b/CHANGES
@@ -333,6 +333,8 @@ Release 0.11.0 - unreleased
SUB TASKS
+ TAJO-1615: Implement TaskManager. (jinho)
+
TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
TAJO-1613: Rename StorageManager to Tablespace. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index f1e2fe5..1a92a7a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -86,7 +86,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
HashShuffleAppender appender = appenderMap.get(partId);
if (appender == null) {
appender = hashShuffleAppenderManager.getAppender(context.getConf(),
- context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
+ context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
appenderMap.put(partId, appender);
}
return appender;
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index de14c9a..87a19a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -79,7 +79,7 @@ public abstract class PhysicalExec implements SchemaObject {
}
protected Path getExecutorTmpDir() {
- return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(),
+ return new Path(context.getTaskId().getTaskId().getExecutionBlockId().getQueryId().toString(),
UUID.randomUUID().toString());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index af28886..2a18de7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -188,7 +188,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
//TODO implement with ResourceManager for scheduler
- throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage()));
+ TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder
+ response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder();
+ done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build());
}
private Worker createWorkerResource(NodeHeartbeat request) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
new file mode 100644
index 0000000..c424154
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+/**
+ * This class is intended to be installed by calling
+ * {@link Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)}
+ * In the main entry point. It is intended to try and cleanly shut down
+ * programs using the Yarn Event framework.
+ *
+ * Note: Right now it only will shut down the program if a Error is caught, but
+ * not any other exception. Anything else is just logged.
+ *
+ * this is an implementation copied from YarnUncaughtExceptionHandler
+ */
+public class TajoUncaughtExceptionHandler implements UncaughtExceptionHandler {
+ private static final Log LOG = LogFactory.getLog(TajoUncaughtExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ if(ShutdownHookManager.get().isShutdownInProgress()) {
+ LOG.error("Thread " + t + " threw an Throwable, but we are shutting " +
+ "down, so ignoring this", e);
+ } else if(e instanceof Error) {
+ try {
+ LOG.fatal("Thread " + t + " threw an Error.", e);
+ } catch (Throwable err) {
+ //We don't want to not exit because of an issue with logging
+ }
+
+ if(e instanceof OutOfMemoryError) {
+ //After catching an OOM java says it is undefined behavior, so don't
+ //even try to clean up or we can get stuck on shutdown.
+ try {
+ System.err.println("Halting due to Out Of Memory Error...");
+ } catch (Throwable err) {
+ //Again we done want to exit because of logging issues.
+ }
+ ExitUtil.halt(-1);
+ } else {
+ //ExitUtil.terminate(-1);
+ }
+ } else {
+ LOG.error("Thread " + t + " threw an Exception.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index e8ba304..daced3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -217,7 +217,7 @@ public class HistoryWriter extends AbstractService {
public void run() {
LOG.info("HistoryWriter_" + processName + " started.");
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
- while (!stopped.get()) {
+ while (!stopped.get() && !Thread.interrupted()) {
List<WriterFuture<WriterHolder>> histories = Lists.newArrayList();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0cc3304..9e4a60f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -45,7 +46,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -61,10 +61,10 @@ public class ExecutionBlockContext {
private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
private TaskRunnerManager manager;
- public AtomicInteger completedTasksNum = new AtomicInteger();
- public AtomicInteger succeededTasksNum = new AtomicInteger();
- public AtomicInteger killedTasksNum = new AtomicInteger();
- public AtomicInteger failedTasksNum = new AtomicInteger();
+ protected AtomicInteger completedTasksNum = new AtomicInteger();
+ protected AtomicInteger succeededTasksNum = new AtomicInteger();
+ protected AtomicInteger killedTasksNum = new AtomicInteger();
+ protected AtomicInteger failedTasksNum = new AtomicInteger();
private FileSystem localFS;
// for input files
@@ -95,17 +95,18 @@ public class ExecutionBlockContext {
// It keeps all of the query unit attempts while a TaskRunner is running.
private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap();
+ @Deprecated
private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
- public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext,
- TaskRunnerManager manager, QueryContext queryContext, String plan,
- ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster,
- PlanProto.ShuffleType shuffleType) throws Throwable {
+ private final Map<TaskId, TaskHistory> taskHistories = Maps.newTreeMap();
+
+ public ExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+ TaskRunnerManager manager, RunExecutionBlockRequestProto request) throws IOException {
this.manager = manager;
- this.executionBlockId = executionBlockId;
+ this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
this.connManager = RpcClientManager.getInstance();
- this.queryMaster = queryMaster;
- this.systemConf = conf;
+ this.queryMaster = new WorkerConnectionInfo(request.getQueryMaster());
+ this.systemConf = workerContext.getConf();
this.reporter = new Reporter();
this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
this.localFS = FileSystem.getLocal(systemConf);
@@ -113,11 +114,11 @@ public class ExecutionBlockContext {
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
this.queryEngine = new TajoQueryEngine(systemConf);
- this.queryContext = queryContext;
- this.plan = plan;
+ this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
+ this.plan = request.getPlanJson();
this.resource = new ExecutionBlockSharedResource();
this.workerContext = workerContext;
- this.shuffleType = shuffleType;
+ this.shuffleType = request.getShuffleType();
}
public void init() throws Throwable {
@@ -131,7 +132,8 @@ public class ExecutionBlockContext {
UserGroupInformation.setConfiguration(systemConf);
// TODO - 'load credential' should be implemented
// Getting taskOwner
- UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
+ UserGroupInformation
+ taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
// initialize DFS and LocalFileSystems
this.taskOwner = taskOwner;
@@ -144,7 +146,7 @@ public class ExecutionBlockContext {
try {
getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} catch (Throwable t) {
- //ignore
+ LOG.error(t);
}
throw e;
}
@@ -183,9 +185,9 @@ public class ExecutionBlockContext {
// If ExecutionBlock is stopped, all running or pending tasks will be marked as failed.
for (Task task : tasks.values()) {
- if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING ||
- task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) {
- task.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+ if (task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING ||
+ task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING) {
+
try{
task.abort();
} catch (Throwable e){
@@ -194,7 +196,7 @@ public class ExecutionBlockContext {
}
}
tasks.clear();
-
+ taskHistories.clear();
resource.release();
RpcClientManager.cleanup(client);
}
@@ -253,18 +255,40 @@ public class ExecutionBlockContext {
return tasks.get(taskAttemptId);
}
+ @Deprecated
public void stopTaskRunner(String id){
manager.stopTaskRunner(id);
}
+ @Deprecated
public TaskRunner getTaskRunner(String taskRunnerId){
return manager.getTaskRunner(taskRunnerId);
}
+ @Deprecated
public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) {
histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
}
+ public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) {
+ taskHistories.put(taskId, taskHistory);
+ }
+
+ public Map<TaskId, TaskHistory> getTaskHistories() {
+ return taskHistories;
+ }
+
+ public void fatalError(TaskAttemptId taskAttemptId, String message) {
+ if (message == null) {
+ message = "No error message";
+ }
+ TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+ .setId(taskAttemptId.getProto())
+ .setErrorMessage(message);
+
+ getStub().fatalError(null, builder.build(), NullCallback.get());
+ }
+
public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){
histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId));
return histories.get(runner.getId());
@@ -355,7 +379,6 @@ public class ExecutionBlockContext {
protected class Reporter {
private Thread reporterThread;
- private AtomicBoolean reporterStop = new AtomicBoolean();
private static final int PROGRESS_INTERVAL = 1000;
private static final int MAX_RETRIES = 10;
@@ -374,7 +397,7 @@ public class ExecutionBlockContext {
int remainingRetries = MAX_RETRIES;
@Override
public void run() {
- while (!reporterStop.get() && !Thread.interrupted()) {
+ while (!isStopped() && !Thread.interrupted()) {
try {
Interface masterStub = getStub();
@@ -384,13 +407,11 @@ public class ExecutionBlockContext {
} else {
for (Task task : new ArrayList<Task>(tasks.values())){
- if (task.isRunning() && task.isProgressChanged()) {
- task.updateProgress();
+ if (task.getTaskContext().getState() ==
+ TajoProtos.TaskAttemptState.TA_RUNNING && task.isProgressChanged()) {
masterStub.statusUpdate(null, task.getReport(), NullCallback.get());
- task.getContext().setProgressChanged(false);
- } else {
- task.updateProgress();
}
+ task.updateProgress();
}
}
} catch (Throwable t) {
@@ -402,7 +423,7 @@ public class ExecutionBlockContext {
throw new RuntimeException(t);
}
} finally {
- if (remainingRetries > 0 && !reporterStop.get()) {
+ if (remainingRetries > 0 && !isStopped()) {
synchronized (reporterThread) {
try {
reporterThread.wait(PROGRESS_INTERVAL);
@@ -417,10 +438,6 @@ public class ExecutionBlockContext {
}
public void stop() throws InterruptedException {
- if (reporterStop.getAndSet(true)) {
- return;
- }
-
if (reporterThread != null) {
// Intent of the lock is to not send an interupt in the middle of an
// umbilical.ping or umbilical.statusUpdate
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
new file mode 100644
index 0000000..0721ef1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+@Deprecated
+public class LegacyTaskImpl implements Task {
+ private static final Log LOG = LogFactory.getLog(LegacyTaskImpl.class);
+ private static final float FETCHER_PROGRESS = 0.5f;
+
+ private final TajoConf systemConf;
+ private final QueryContext queryContext;
+ private final ExecutionBlockContext executionBlockContext;
+ private final String taskRunnerId;
+
+ private final Path taskDir;
+ private final TaskRequest request;
+ private TaskAttemptContext context;
+ private List<Fetcher> fetcherRunners;
+ private LogicalNode plan;
+ private final Map<String, TableDesc> descs = Maps.newHashMap();
+ private PhysicalExec executor;
+ private boolean interQuery;
+ private Path inputTableBaseDir;
+
+ private long startTime;
+ private long finishTime;
+
+ private final TableStats inputStats;
+ private List<FileChunk> localChunks;
+
+ // TODO - to be refactored
+ private ShuffleType shuffleType = null;
+ private Schema finalSchema = null;
+ private TupleComparator sortComp = null;
+
+ public LegacyTaskImpl(String taskRunnerId,
+ Path baseDir,
+ TaskAttemptId taskId,
+ final ExecutionBlockContext executionBlockContext,
+ final TaskRequest request) throws IOException {
+ this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
+ }
+
+ public LegacyTaskImpl(String taskRunnerId,
+ Path baseDir,
+ TaskAttemptId taskId,
+ TajoConf conf,
+ final ExecutionBlockContext executionBlockContext,
+ final TaskRequest request) throws IOException {
+ this.taskRunnerId = taskRunnerId;
+ this.request = request;
+
+ this.systemConf = conf;
+ this.queryContext = request.getQueryContext(systemConf);
+ this.executionBlockContext = executionBlockContext;
+ this.taskDir = StorageUtil.concatPath(baseDir,
+ taskId.getTaskId().getId() + "_" + taskId.getId());
+
+ this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
+ request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+ this.context.setDataChannel(request.getDataChannel());
+ this.context.setEnforcer(request.getEnforcer());
+ this.context.setState(TaskAttemptState.TA_PENDING);
+ this.inputStats = new TableStats();
+ this.fetcherRunners = Lists.newArrayList();
+ }
+
+ public void initPlan() throws IOException {
+ plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
+ LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+ if (scanNode != null) {
+ for (LogicalNode node : scanNode) {
+ ScanNode scan = (ScanNode) node;
+ descs.put(scan.getCanonicalName(), scan.getTableDesc());
+ }
+ }
+
+ LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
+ if (partitionScanNode != null) {
+ for (LogicalNode node : partitionScanNode) {
+ PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+ descs.put(scan.getCanonicalName(), scan.getTableDesc());
+ }
+ }
+
+ interQuery = request.getProto().getInterQuery();
+ if (interQuery) {
+ context.setInterQuery();
+ this.shuffleType = context.getDataChannel().getShuffleType();
+
+ if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+ SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+ this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+ this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
+ }
+ } else {
+ Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+ .getAppenderFilePath(getId(), queryContext.getStagingDir());
+ LOG.info("Output File Path: " + outFilePath);
+ context.setOutputPath(outFilePath);
+ }
+
+ this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+ LOG.info("==================================");
+ LOG.info("* Stage " + request.getId() + " is initialized");
+ LOG.info("* InterQuery: " + interQuery
+ + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
+ ", Fragments (num: " + request.getFragments().size() + ")" +
+ ", Fetches (total:" + request.getFetches().size() + ") :");
+
+ if(LOG.isDebugEnabled()) {
+ for (FetchImpl f : request.getFetches()) {
+ LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+ }
+ }
+ LOG.info("* Local task dir: " + taskDir);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("* plan:\n");
+ LOG.debug(plan.toString());
+ }
+ LOG.info("==================================");
+ }
+
+ private void startScriptExecutors() throws IOException {
+ for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+ executor.start(systemConf);
+ }
+ }
+
+ private void stopScriptExecutors() {
+ for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+ executor.shutdown();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ initPlan();
+ startScriptExecutors();
+
+ if (context.getState() == TaskAttemptState.TA_PENDING) {
+ // initialize a task temporal dir
+ FileSystem localFS = executionBlockContext.getLocalFS();
+ localFS.mkdirs(taskDir);
+
+ if (request.getFetches().size() > 0) {
+ inputTableBaseDir = localFS.makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
+ getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+ localFS.mkdirs(inputTableBaseDir);
+ Path tableDir;
+ for (String inputTable : context.getInputTables()) {
+ tableDir = new Path(inputTableBaseDir, inputTable);
+ if (!localFS.exists(tableDir)) {
+ LOG.info("the directory is created " + tableDir.toUri());
+ localFS.mkdirs(tableDir);
+ }
+ }
+ }
+ // for localizing the intermediate data
+ fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
+ }
+ }
+
+ private TaskAttemptId getId() {
+ return context.getTaskId();
+ }
+
+ public String toString() {
+ return "queryId: " + this.getId() + " status: " + context.getState();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return context.isStopped();
+ }
+
+ @Override
+ public TaskAttemptContext getTaskContext() {
+ return context;
+ }
+
+ @Override
+ public ExecutionBlockContext getExecutionBlockContext() {
+ return executionBlockContext;
+ }
+
+ @Override
+ public boolean hasFetchPhase() {
+ return fetcherRunners.size() > 0;
+ }
+
+ @Override
+ public void fetch() {
+ ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
+ for (Fetcher f : fetcherRunners) {
+ executorService.submit(new FetchRunner(context, f));
+ }
+ }
+
+ @Override
+ public void kill() {
+ stopScriptExecutors();
+ context.setState(TaskAttemptState.TA_KILLED);
+ context.stop();
+ }
+
+ @Override
+ public void abort() {
+ stopScriptExecutors();
+ context.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+ context.stop();
+ }
+
+ @Override
+ public TaskStatusProto getReport() {
+ TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+ builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
+ builder.setId(context.getTaskId().getProto())
+ .setProgress(context.getProgress())
+ .setState(context.getState());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.getResultStats() != null) {
+ builder.setResultStats(context.getResultStats().getProto());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean isProgressChanged() {
+ return context.isProgressChanged();
+ }
+
+ @Override
+ public void updateProgress() {
+ if(context != null && context.isStopped()){
+ return;
+ }
+
+ if (executor != null && context.getProgress() < 1.0f) {
+ context.setExecutorProgress(executor.getProgress());
+ }
+ }
+
+ private CatalogProtos.TableStatsProto reloadInputStats() {
+ synchronized(inputStats) {
+ if (this.executor == null) {
+ return inputStats.getProto();
+ }
+
+ TableStats executorInputStats = this.executor.getInputStats();
+
+ if (executorInputStats != null) {
+ inputStats.setValues(executorInputStats);
+ }
+ return inputStats.getProto();
+ }
+ }
+
+ private TaskCompletionReport getTaskCompletionReport() {
+ TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+ builder.setId(context.getTaskId().getProto());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.hasResultStats()) {
+ builder.setResultStats(context.getResultStats().getProto());
+ } else {
+ builder.setResultStats(new TableStats().getProto());
+ }
+
+ Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
+ if (it.hasNext()) {
+ do {
+ Entry<Integer, String> entry = it.next();
+ ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+ part.setPartId(entry.getKey());
+
+ // Set output volume
+ if (context.getPartitionOutputVolume() != null) {
+ for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
+ if (entry.getKey().equals(e.getKey())) {
+ part.setVolume(e.getValue().longValue());
+ break;
+ }
+ }
+ }
+
+ builder.addShuffleFileOutputs(part.build());
+ } while (it.hasNext());
+ }
+
+ return builder.build();
+ }
+
+ private void waitForFetch() throws InterruptedException, IOException {
+ context.getFetchLatch().await();
+ LOG.info(context.getTaskId() + " All fetches are done!");
+ Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+
+ // Get all broadcasted tables
+ Set<String> broadcastTableNames = new HashSet<String>();
+ List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
+ if (broadcasts != null) {
+ for (EnforceProperty eachBroadcast : broadcasts) {
+ broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
+ }
+ }
+
+ // localize the fetched data and skip the broadcast table
+ for (String inputTable: inputs) {
+ if (broadcastTableNames.contains(inputTable)) {
+ continue;
+ }
+ File tableDir = new File(context.getFetchIn(), inputTable);
+ FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+ context.updateAssignedFragments(inputTable, frags);
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ startTime = System.currentTimeMillis();
+ Throwable error = null;
+ try {
+ if(!context.isStopped()) {
+ context.setState(TaskAttemptState.TA_RUNNING);
+ if (context.hasFetchPhase()) {
+ // If the fetch is still in progress, the query unit must wait for
+ // complete.
+ waitForFetch();
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ updateProgress();
+ }
+
+ this.executor = executionBlockContext.getTQueryEngine().
+ createPlan(context, plan);
+ this.executor.init();
+
+ while(!context.isStopped() && executor.next() != null) {
+ }
+ }
+ } catch (Throwable e) {
+ error = e ;
+ LOG.error(e.getMessage(), e);
+ stopScriptExecutors();
+ context.stop();
+ } finally {
+ if (executor != null) {
+ try {
+ executor.close();
+ reloadInputStats();
+ } catch (IOException e) {
+ LOG.error(e, e);
+ }
+ this.executor = null;
+ }
+
+ executionBlockContext.completedTasksNum.incrementAndGet();
+ context.getHashShuffleAppenderManager().finalizeTask(getId());
+
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
+ if (context.isStopped()) {
+ context.setExecutorProgress(0.0f);
+
+ if (context.getState() == TaskAttemptState.TA_KILLED) {
+ queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+ executionBlockContext.killedTasksNum.incrementAndGet();
+ } else {
+ context.setState(TaskAttemptState.TA_FAILED);
+ TaskFatalErrorReport.Builder errorBuilder =
+ TaskFatalErrorReport.newBuilder()
+ .setId(getId().getProto());
+ if (error != null) {
+ if (error.getMessage() == null) {
+ errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+ } else {
+ errorBuilder.setErrorMessage(error.getMessage());
+ }
+ errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+ }
+
+ queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+ executionBlockContext.failedTasksNum.incrementAndGet();
+ }
+ } else {
+ // if successful
+ context.stop();
+ context.setProgress(1.0f);
+ context.setState(TaskAttemptState.TA_SUCCEEDED);
+ executionBlockContext.succeededTasksNum.incrementAndGet();
+
+ TaskCompletionReport report = getTaskCompletionReport();
+ queryMasterStub.done(null, report, NullCallback.get());
+ }
+ finishTime = System.currentTimeMillis();
+ LOG.info(context.getTaskId() + " completed. " +
+ "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+ ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ TaskHistory taskHistory = createTaskHistory();
+ executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
+ executionBlockContext.getTasks().remove(getId());
+
+ fetcherRunners.clear();
+ fetcherRunners = null;
+ try {
+ if(executor != null) {
+ executor.close();
+ executor = null;
+ }
+ } catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ }
+
+ executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
+ stopScriptExecutors();
+ }
+
+ public TaskHistory createTaskHistory() {
+ TaskHistory taskHistory = null;
+ try {
+ taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
+ startTime, finishTime, reloadInputStats());
+
+ if (context.getOutputPath() != null) {
+ taskHistory.setOutputPath(context.getOutputPath().toString());
+ }
+
+ if (context.getWorkDir() != null) {
+ taskHistory.setWorkingPath(context.getWorkDir().toString());
+ }
+
+ if (context.getResultStats() != null) {
+ taskHistory.setOutputStats(context.getResultStats().getProto());
+ }
+
+ if (hasFetchPhase()) {
+ taskHistory.setTotalFetchCount(fetcherRunners.size());
+ int i = 0;
+ FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+ for (Fetcher fetcher : fetcherRunners) {
+ // TODO store the fetcher histories
+ if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
+ builder.setStartTime(fetcher.getStartTime());
+ builder.setFinishTime(fetcher.getFinishTime());
+ builder.setFileLength(fetcher.getFileLen());
+ builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+ builder.setState(fetcher.getState());
+
+ taskHistory.addFetcherHistory(builder.build());
+ }
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+ }
+ taskHistory.setFinishedFetchCount(i);
+ }
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+
+ return taskHistory;
+ }
+
+ public int hashCode() {
+ return context.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof LegacyTaskImpl) {
+ LegacyTaskImpl other = (LegacyTaskImpl) obj;
+ return this.context.equals(other.context);
+ }
+ return false;
+ }
+
+ private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+ throws IOException {
+ Configuration c = new Configuration(systemConf);
+ c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+ FileSystem fs = FileSystem.get(c);
+ Path tablePath = new Path(file.getAbsolutePath());
+
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus f : fileLists) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+ listTablets.add(tablet);
+ }
+
+ // Special treatment for locally pseudo fetched chunks
+ synchronized (localChunks) {
+ for (FileChunk chunk : localChunks) {
+ if (name.equals(chunk.getEbId())) {
+ tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+ listTablets.add(tablet);
+ LOG.info("One local chunk is added to listTablets");
+ }
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ private class FetchRunner implements Runnable {
+ private final TaskAttemptContext ctx;
+ private final Fetcher fetcher;
+ private int maxRetryNum;
+
+ public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+ this.ctx = ctx;
+ this.fetcher = fetcher;
+ this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
+ }
+
+ @Override
+ public void run() {
+ int retryNum = 0;
+ int retryWaitTime = 1000; //sec
+
+ try { // for releasing fetch latch
+ while(!context.isStopped() && retryNum < maxRetryNum) {
+ if (retryNum > 0) {
+ try {
+ Thread.sleep(retryWaitTime);
+ retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+ }
+ try {
+ FileChunk fetched = fetcher.get();
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+ && fetched.getFile() != null) {
+ if (fetched.fromRemote() == false) {
+ localChunks.add(fetched);
+ LOG.info("Add a new FileChunk to local chunk list");
+ }
+ break;
+ }
+ } catch (Throwable e) {
+ LOG.error("Fetch failed: " + fetcher.getURI(), e);
+ }
+ retryNum++;
+ }
+ } finally {
+ if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+ fetcherFinished(ctx);
+ } else {
+ if (retryNum == maxRetryNum) {
+ LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+ }
+ stopScriptExecutors();
+ context.stop(); // retry task
+ ctx.getFetchLatch().countDown();
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+ if (totalFetcher > 0) {
+ return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+ } else {
+ return 0.0f;
+ }
+ }
+
+ private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+ int fetcherSize = fetcherRunners.size();
+ if(fetcherSize == 0) {
+ return;
+ }
+
+ ctx.getFetchLatch().countDown();
+
+ int remainFetcher = (int) ctx.getFetchLatch().getCount();
+ if (remainFetcher == 0) {
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ } else {
+ context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+ }
+ }
+
+ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+ List<FetchImpl> fetches) throws IOException {
+
+ if (fetches.size() > 0) {
+ Path inputDir = executionBlockContext.getLocalDirAllocator().
+ getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+
+ int i = 0;
+ File storeDir;
+ File defaultStoreFile;
+ FileChunk storeChunk = null;
+ List<Fetcher> runnerList = Lists.newArrayList();
+
+ for (FetchImpl f : fetches) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ storeDir.mkdirs();
+ }
+
+ for (URI uri : f.getURIs()) {
+ defaultStoreFile = new File(storeDir, "in_" + i);
+ InetAddress address = InetAddress.getByName(uri.getHost());
+
+ WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
+ if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
+ boolean hasError = false;
+ try {
+ LOG.info("Try to get local file chunk at local host");
+ storeChunk = getLocalStoredFileChunk(uri, systemConf);
+ } catch (Throwable t) {
+ hasError = true;
+ }
+
+ // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+ // So, we should skip and don't need to create storeChunk.
+ if (storeChunk == null && !hasError) {
+ continue;
+ }
+
+ if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+ && hasError == false) {
+ storeChunk.setFromRemote(false);
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+
+ // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+ // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+ storeChunk.setEbId(f.getName());
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
+ LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+ runnerList.add(fetcher);
+ i++;
+ }
+ }
+ ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+ return runnerList;
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+ // Parse the URI
+ LOG.info("getLocalStoredFileChunk starts");
+ final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> stageIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
+
+ if (types == null || stageIds == null || qids == null || partIds == null) {
+ LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
+ return null;
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
+ LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
+ return null;
+ }
+
+ String queryId = qids.get(0);
+ String shuffleType = types.get(0);
+ String sid = stageIds.get(0);
+ String partId = partIds.get(0);
+
+ if (shuffleType.equals("r") && taskIdList == null) {
+ LOG.error("Invalid URI - For range shuffle, taskId is required");
+ return null;
+ }
+ List<String> taskIds = splitMaps(taskIdList);
+
+ FileChunk chunk = null;
+ long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+ long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+ LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
+
+ // The working directory of Tajo worker for each query, including stage
+ String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+ // If the stage requires a range shuffle
+ if (shuffleType.equals("r")) {
+ String ta = taskIds.get(0);
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+ LOG.warn("Range shuffle - file not exist");
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ try {
+ chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("getFileChunks() throws exception");
+ return null;
+ }
+
+ // If the stage requires a hash shuffle or a scattered hash shuffle
+ } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+ String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+ LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+ return null;
+ }
+ chunk = new FileChunk(file, startPos, readLen);
+
+ } else {
+ LOG.error("Unknown shuffle type");
+ return null;
+ }
+
+ return chunk;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ public static Path getTaskAttemptDir(TaskAttemptId quid) {
+ Path workDir =
+ StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+ String.valueOf(quid.getTaskId().getId()),
+ String.valueOf(quid.getId()));
+ return workDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
index 20eec6b..e763d13 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -30,27 +30,23 @@ import org.apache.tajo.resource.NodeResources;
import org.apache.tajo.storage.DiskUtil;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
-import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
-import org.apache.tajo.worker.event.NodeResourceManagerEvent;
-import org.apache.tajo.worker.event.NodeStatusEvent;
-
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tajo.worker.event.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> {
+public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceEvent> {
private static final Log LOG = LogFactory.getLog(NodeResourceManager.class);
private final Dispatcher dispatcher;
+ private final EventHandler taskEventHandler;
private NodeResource totalResource;
private NodeResource availableResource;
- private AtomicInteger allocatedSize;
private TajoConf tajoConf;
- public NodeResourceManager(Dispatcher dispatcher){
+ public NodeResourceManager(Dispatcher dispatcher, EventHandler taskEventHandler) {
super(NodeResourceManager.class.getName());
this.dispatcher = dispatcher;
+ this.taskEventHandler = taskEventHandler;
}
@Override
@@ -61,14 +57,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler
this.tajoConf = (TajoConf)conf;
this.totalResource = createWorkerResource(tajoConf);
this.availableResource = NodeResources.clone(totalResource);
- this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this);
- this.allocatedSize = new AtomicInteger();
+ this.dispatcher.register(NodeResourceEvent.EventType.class, this);
+
super.serviceInit(conf);
LOG.info("Initialized NodeResourceManager for " + totalResource);
}
@Override
- public void handle(NodeResourceManagerEvent event) {
+ public void handle(NodeResourceEvent event) {
if (event instanceof NodeResourceAllocateEvent) {
NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event;
@@ -76,22 +72,27 @@ public class NodeResourceManager extends AbstractService implements EventHandler
for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) {
NodeResource resource = new NodeResource(request.getResource());
if (allocate(resource)) {
- allocatedSize.incrementAndGet();
- //TODO send task event to taskExecutor
+ if(allocateEvent.getRequest().hasExecutionBlockRequest()){
+ //send ExecutionBlock start event to TaskManager
+ startExecutionBlock(allocateEvent.getRequest().getExecutionBlockRequest());
+ }
+
+ //send task start event to TaskExecutor
+ startTask(request.getTaskRequest(), resource);
} else {
+ // reject the exceeded requests
response.addCancellationTask(request);
}
}
allocateEvent.getCallback().run(response.build());
} else if (event instanceof NodeResourceDeallocateEvent) {
- allocatedSize.decrementAndGet();
NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event;
release(deallocateEvent.getResource());
// send current resource to ResourceTracker
getDispatcher().getEventHandler().handle(
- new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource()));
+ new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
}
}
@@ -107,10 +108,6 @@ public class NodeResourceManager extends AbstractService implements EventHandler
return availableResource;
}
- public int getAllocatedSize() {
- return allocatedSize.get();
- }
-
private boolean allocate(NodeResource resource) {
//TODO consider the jvm free memory
if (NodeResources.fitsIn(resource, availableResource)) {
@@ -120,6 +117,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler
return false;
}
+ protected void startExecutionBlock(RunExecutionBlockRequestProto request) {
+ taskEventHandler.handle(new ExecutionBlockStartEvent(request));
+ }
+
+ protected void startTask(TaskRequestProto request, NodeResource resource) {
+ taskEventHandler.handle(new TaskStartEvent(request, resource));
+ }
+
private void release(NodeResource resource) {
NodeResources.addTo(availableResource, resource);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index 84ac419..d13cd50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -57,16 +57,16 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
private volatile boolean isStopped;
private volatile long heartBeatInterval;
private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
- private final WorkerConnectionInfo connectionInfo;
+ private final TajoWorker.WorkerContext workerContext;
private final NodeResourceManager nodeResourceManager;
private AsyncRpcClient rmClient;
private ServiceTracker serviceTracker;
private TajoResourceTrackerProtocolService.Interface resourceTracker;
private int queueingLimit;
- public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) {
+ public NodeStatusUpdater(TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) {
super(NodeStatusUpdater.class.getSimpleName());
- this.connectionInfo = connectionInfo;
+ this.workerContext = workerContext;
this.nodeResourceManager = resourceManager;
}
@@ -99,7 +99,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
this.isStopped = true;
synchronized (updaterThread) {
- updaterThread.notifyAll();
+ updaterThread.interrupt();
+ updaterThread.join();
}
super.serviceStop();
LOG.info("NodeStatusUpdater stopped.");
@@ -107,14 +108,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
@Override
public void handle(NodeStatusEvent event) {
- switch (event.getType()) {
- case REPORT_RESOURCE:
- heartBeatRequestQueue.add(event); //batch report to ResourceTracker
- break;
- case FLUSH_REPORTS:
- heartBeatRequestQueue.add(event); //flush report to ResourceTracker
- break;
- }
+ heartBeatRequestQueue.add(event);
}
public int getQueueSize() {
@@ -128,13 +122,13 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) {
NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
requestProto.setAvailableResource(resource.getProto());
- requestProto.setWorkerId(connectionInfo.getId());
+ requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
return requestProto.build();
}
private NodeHeartbeatRequestProto createHeartBeatReport() {
NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
- requestProto.setWorkerId(connectionInfo.getId());
+ requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
return requestProto.build();
}
@@ -142,8 +136,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto());
requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto());
- requestProto.setWorkerId(connectionInfo.getId());
- requestProto.setConnectionInfo(connectionInfo.getProto());
+ requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
+ requestProto.setConnectionInfo(workerContext.getConnectionInfo().getProto());
//TODO set node status to requestProto.setStatus()
return requestProto.build();
@@ -231,8 +225,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
}
if (!events.isEmpty()) {
- // send last available resource;
- lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource()));
+ // send current available resource;
+ lastResponse = sendHeartbeat(createResourceReport(nodeResourceManager.getAvailableResource()));
} else {
// send ping;
lastResponse = sendHeartbeat(createHeartBeatReport());
@@ -250,10 +244,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
}
} catch (NoSuchMethodException nsme) {
LOG.fatal(nsme.getMessage(), nsme);
- Runtime.getRuntime().halt(1);
+ Runtime.getRuntime().halt(-1);
} catch (ClassNotFoundException cnfe) {
LOG.fatal(cnfe.getMessage(), cnfe);
- Runtime.getRuntime().halt(1);
+ Runtime.getRuntime().halt(-1);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
if (!isStopped) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4f07ca6..fbd070e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -163,7 +163,7 @@ public class TajoWorker extends CompositeService {
serviceTracker = ServiceTrackerFactory.get(systemConf);
- this.workerContext = new WorkerContext();
+ this.workerContext = new TajoWorkerContext();
this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
@@ -386,7 +386,45 @@ public class TajoWorker extends CompositeService {
LOG.info("TajoWorker main thread exiting");
}
- public class WorkerContext {
+ public interface WorkerContext {
+ QueryMaster getQueryMaster();
+
+ TajoConf getConf();
+
+ ServiceTracker getServiceTracker();
+
+ QueryMasterManagerService getQueryMasterManagerService();
+
+ TaskRunnerManager getTaskRunnerManager();
+
+ CatalogService getCatalog();
+
+ WorkerConnectionInfo getConnectionInfo();
+
+ String getWorkerName();
+
+ LocalDirAllocator getLocalDirAllocator();
+
+ ClusterResourceSummary getClusterResource();
+
+ TajoSystemMetrics getWorkerSystemMetrics();
+
+ HashShuffleAppenderManager getHashShuffleAppenderManager();
+
+ HistoryWriter getTaskHistoryWriter();
+
+ HistoryReader getHistoryReader();
+
+ void cleanup(String strPath);
+
+ void cleanupTemporalDirectories();
+
+ void setClusterResource(ClusterResourceSummary clusterResource);
+
+ void setNumClusterNodes(int numClusterNodes);
+ }
+
+ class TajoWorkerContext implements WorkerContext {
public QueryMaster getQueryMaster() {
if (queryMasterManagerService == null) {
return null;
@@ -430,7 +468,7 @@ public class TajoWorker extends CompositeService {
return lDirAllocator;
}
- protected void cleanup(String strPath) {
+ public void cleanup(String strPath) {
if (deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -446,7 +484,7 @@ public class TajoWorker extends CompositeService {
}
}
- protected void cleanupTemporalDirectories() {
+ public void cleanupTemporalDirectories() {
if (deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -627,6 +665,7 @@ public class TajoWorker extends CompositeService {
}
public static void main(String[] args) throws Exception {
+ Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
TajoConf tajoConf = new TajoConf();
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index bbf8564..de8afe8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -116,14 +116,7 @@ public class TajoWorkerManagerService extends CompositeService
workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
try {
- workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
- new WorkerConnectionInfo(request.getQueryMaster())
- , new ExecutionBlockId(request.getExecutionBlockId())
- , request.getContainerId()
- , new QueryContext(workerContext.getConf(), request.getQueryContext()),
- request.getPlanJson(),
- request.getShuffleType()
- ));
+ workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(request));
done.run(TajoWorker.TRUE_PROTO);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
[2/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)
Posted by jh...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 4716dcc..c849940 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,839 +18,35 @@
package org.apache.tajo.worker;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
-import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+public interface Task {
-public class Task {
- private static final Log LOG = LogFactory.getLog(Task.class);
- private static final float FETCHER_PROGRESS = 0.5f;
+ void init() throws IOException;
- private final TajoConf systemConf;
- private final QueryContext queryContext;
- private final ExecutionBlockContext executionBlockContext;
- private final TaskAttemptId taskId;
- private final String taskRunnerId;
+ void fetch();
- private final Path taskDir;
- private final TaskRequest request;
- private TaskAttemptContext context;
- private List<Fetcher> fetcherRunners;
- private LogicalNode plan;
- private final Map<String, TableDesc> descs = Maps.newHashMap();
- private PhysicalExec executor;
- private boolean interQuery;
- private Path inputTableBaseDir;
+ void run() throws Exception;
- private long startTime;
- private long finishTime;
+ void kill();
- private final TableStats inputStats;
- private List<FileChunk> localChunks;
+ void abort();
- // TODO - to be refactored
- private ShuffleType shuffleType = null;
- private Schema finalSchema = null;
- private TupleComparator sortComp = null;
+ void cleanup();
- public Task(String taskRunnerId,
- Path baseDir,
- TaskAttemptId taskId,
- final ExecutionBlockContext executionBlockContext,
- final TaskRequest request) throws IOException {
- this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
- }
+ boolean hasFetchPhase();
- public Task(String taskRunnerId,
- Path baseDir,
- TaskAttemptId taskId,
- TajoConf conf,
- final ExecutionBlockContext executionBlockContext,
- final TaskRequest request) throws IOException {
- this.taskRunnerId = taskRunnerId;
- this.request = request;
- this.taskId = taskId;
+ boolean isProgressChanged();
- this.systemConf = conf;
- this.queryContext = request.getQueryContext(systemConf);
- this.executionBlockContext = executionBlockContext;
- this.taskDir = StorageUtil.concatPath(baseDir,
- taskId.getTaskId().getId() + "_" + taskId.getId());
+ boolean isStopped();
- this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
- request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
- this.context.setDataChannel(request.getDataChannel());
- this.context.setEnforcer(request.getEnforcer());
- this.context.setState(TaskAttemptState.TA_PENDING);
- this.inputStats = new TableStats();
- this.fetcherRunners = Lists.newArrayList();
- }
+ void updateProgress();
- public void initPlan() throws IOException {
- plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
- LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
- if (scanNode != null) {
- for (LogicalNode node : scanNode) {
- ScanNode scan = (ScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
+ TaskAttemptContext getTaskContext();
- LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
- if (partitionScanNode != null) {
- for (LogicalNode node : partitionScanNode) {
- PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
+ ExecutionBlockContext getExecutionBlockContext();
- interQuery = request.getProto().getInterQuery();
- if (interQuery) {
- context.setInterQuery();
- this.shuffleType = context.getDataChannel().getShuffleType();
-
- if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
- SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
- this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
- this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
- }
- } else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
- .getAppenderFilePath(taskId, queryContext.getStagingDir());
- LOG.info("Output File Path: " + outFilePath);
- context.setOutputPath(outFilePath);
- }
-
- this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
- LOG.info("==================================");
- LOG.info("* Stage " + request.getId() + " is initialized");
- LOG.info("* InterQuery: " + interQuery
- + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
- ", Fragments (num: " + request.getFragments().size() + ")" +
- ", Fetches (total:" + request.getFetches().size() + ") :");
-
- if(LOG.isDebugEnabled()) {
- for (FetchImpl f : request.getFetches()) {
- LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
- }
- }
- LOG.info("* Local task dir: " + taskDir);
- if(LOG.isDebugEnabled()) {
- LOG.debug("* plan:\n");
- LOG.debug(plan.toString());
- }
- LOG.info("==================================");
- }
-
- private void startScriptExecutors() throws IOException {
- for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
- executor.start(systemConf);
- }
- }
-
- private void stopScriptExecutors() {
- for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
- executor.shutdown();
- }
- }
-
- public void init() throws IOException {
- initPlan();
- startScriptExecutors();
-
- if (context.getState() == TaskAttemptState.TA_PENDING) {
- // initialize a task temporal dir
- FileSystem localFS = executionBlockContext.getLocalFS();
- localFS.mkdirs(taskDir);
-
- if (request.getFetches().size() > 0) {
- inputTableBaseDir = localFS.makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
- getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
- localFS.mkdirs(inputTableBaseDir);
- Path tableDir;
- for (String inputTable : context.getInputTables()) {
- tableDir = new Path(inputTableBaseDir, inputTable);
- if (!localFS.exists(tableDir)) {
- LOG.info("the directory is created " + tableDir.toUri());
- localFS.mkdirs(tableDir);
- }
- }
- }
- // for localizing the intermediate data
- fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
- }
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public TaskAttemptId getId() {
- return context.getTaskId();
- }
-
- public TaskAttemptState getStatus() {
- return context.getState();
- }
-
- public String toString() {
- return "queryId: " + this.getId() + " status: " + this.getStatus();
- }
-
- public void setState(TaskAttemptState status) {
- context.setState(status);
- }
-
- public TaskAttemptContext getContext() {
- return context;
- }
-
- public boolean hasFetchPhase() {
- return fetcherRunners.size() > 0;
- }
-
- public List<Fetcher> getFetchers() {
- return new ArrayList<Fetcher>(fetcherRunners);
- }
-
- public void fetch() {
- ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
- for (Fetcher f : fetcherRunners) {
- executorService.submit(new FetchRunner(context, f));
- }
- }
-
- public void kill() {
- stopScriptExecutors();
- context.setState(TaskAttemptState.TA_KILLED);
- context.stop();
- }
-
- public void abort() {
- stopScriptExecutors();
- context.stop();
- }
-
- public void cleanUp() {
- // remove itself from worker
- if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
- synchronized (executionBlockContext.getTasks()) {
- executionBlockContext.getTasks().remove(this.getId());
- }
- } else {
- LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
- }
- }
-
- public TaskStatusProto getReport() {
- TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
- builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
- builder.setId(context.getTaskId().getProto())
- .setProgress(context.getProgress())
- .setState(context.getState());
-
- builder.setInputStats(reloadInputStats());
-
- if (context.getResultStats() != null) {
- builder.setResultStats(context.getResultStats().getProto());
- }
- return builder.build();
- }
-
- public boolean isRunning(){
- return context.getState() == TaskAttemptState.TA_RUNNING;
- }
-
- public boolean isProgressChanged() {
- return context.isProgressChanged();
- }
-
- public void updateProgress() {
- if(context != null && context.isStopped()){
- return;
- }
-
- if (executor != null && context.getProgress() < 1.0f) {
- context.setExecutorProgress(executor.getProgress());
- }
- }
-
- private CatalogProtos.TableStatsProto reloadInputStats() {
- synchronized(inputStats) {
- if (this.executor == null) {
- return inputStats.getProto();
- }
-
- TableStats executorInputStats = this.executor.getInputStats();
-
- if (executorInputStats != null) {
- inputStats.setValues(executorInputStats);
- }
- return inputStats.getProto();
- }
- }
-
- private TaskCompletionReport getTaskCompletionReport() {
- TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
- builder.setId(context.getTaskId().getProto());
-
- builder.setInputStats(reloadInputStats());
-
- if (context.hasResultStats()) {
- builder.setResultStats(context.getResultStats().getProto());
- } else {
- builder.setResultStats(new TableStats().getProto());
- }
-
- Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
- if (it.hasNext()) {
- do {
- Entry<Integer, String> entry = it.next();
- ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
- part.setPartId(entry.getKey());
-
- // Set output volume
- if (context.getPartitionOutputVolume() != null) {
- for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
- if (entry.getKey().equals(e.getKey())) {
- part.setVolume(e.getValue().longValue());
- break;
- }
- }
- }
-
- builder.addShuffleFileOutputs(part.build());
- } while (it.hasNext());
- }
-
- return builder.build();
- }
-
- private void waitForFetch() throws InterruptedException, IOException {
- context.getFetchLatch().await();
- LOG.info(context.getTaskId() + " All fetches are done!");
- Collection<String> inputs = Lists.newArrayList(context.getInputTables());
-
- // Get all broadcasted tables
- Set<String> broadcastTableNames = new HashSet<String>();
- List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
- if (broadcasts != null) {
- for (EnforceProperty eachBroadcast : broadcasts) {
- broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
- }
- }
-
- // localize the fetched data and skip the broadcast table
- for (String inputTable: inputs) {
- if (broadcastTableNames.contains(inputTable)) {
- continue;
- }
- File tableDir = new File(context.getFetchIn(), inputTable);
- FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
- context.updateAssignedFragments(inputTable, frags);
- }
- }
-
- public void run() throws Exception {
- startTime = System.currentTimeMillis();
- Throwable error = null;
- try {
- if(!context.isStopped()) {
- context.setState(TaskAttemptState.TA_RUNNING);
- if (context.hasFetchPhase()) {
- // If the fetch is still in progress, the query unit must wait for
- // complete.
- waitForFetch();
- context.setFetcherProgress(FETCHER_PROGRESS);
- context.setProgressChanged(true);
- updateProgress();
- }
-
- this.executor = executionBlockContext.getTQueryEngine().
- createPlan(context, plan);
- this.executor.init();
-
- while(!context.isStopped() && executor.next() != null) {
- }
- }
- } catch (Throwable e) {
- error = e ;
- LOG.error(e.getMessage(), e);
- stopScriptExecutors();
- context.stop();
- } finally {
- if (executor != null) {
- try {
- executor.close();
- reloadInputStats();
- } catch (IOException e) {
- LOG.error(e, e);
- }
- this.executor = null;
- }
-
- executionBlockContext.completedTasksNum.incrementAndGet();
- context.getHashShuffleAppenderManager().finalizeTask(taskId);
-
- QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
- if (context.isStopped()) {
- context.setExecutorProgress(0.0f);
-
- if (context.getState() == TaskAttemptState.TA_KILLED) {
- queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
- executionBlockContext.killedTasksNum.incrementAndGet();
- } else {
- context.setState(TaskAttemptState.TA_FAILED);
- TaskFatalErrorReport.Builder errorBuilder =
- TaskFatalErrorReport.newBuilder()
- .setId(getId().getProto());
- if (error != null) {
- if (error.getMessage() == null) {
- errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
- } else {
- errorBuilder.setErrorMessage(error.getMessage());
- }
- errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
- }
-
- queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
- executionBlockContext.failedTasksNum.incrementAndGet();
- }
- } else {
- // if successful
- context.setProgress(1.0f);
- context.setState(TaskAttemptState.TA_SUCCEEDED);
- executionBlockContext.succeededTasksNum.incrementAndGet();
-
- TaskCompletionReport report = getTaskCompletionReport();
- queryMasterStub.done(null, report, NullCallback.get());
- }
- finishTime = System.currentTimeMillis();
- LOG.info(context.getTaskId() + " completed. " +
- "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
- ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
- + ", killed: " + executionBlockContext.killedTasksNum.intValue()
- + ", failed: " + executionBlockContext.failedTasksNum.intValue());
- cleanupTask();
- }
- }
-
- public void cleanupTask() {
- TaskHistory taskHistory = createTaskHistory();
- executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
- executionBlockContext.getTasks().remove(getId());
-
- fetcherRunners.clear();
- fetcherRunners = null;
- try {
- if(executor != null) {
- executor.close();
- executor = null;
- }
- } catch (IOException e) {
- LOG.fatal(e.getMessage(), e);
- }
-
- executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
- stopScriptExecutors();
- }
-
- public TaskHistory createTaskHistory() {
- TaskHistory taskHistory = null;
- try {
- taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
- startTime, finishTime, reloadInputStats());
-
- if (context.getOutputPath() != null) {
- taskHistory.setOutputPath(context.getOutputPath().toString());
- }
-
- if (context.getWorkDir() != null) {
- taskHistory.setWorkingPath(context.getWorkDir().toString());
- }
-
- if (context.getResultStats() != null) {
- taskHistory.setOutputStats(context.getResultStats().getProto());
- }
-
- if (hasFetchPhase()) {
- taskHistory.setTotalFetchCount(fetcherRunners.size());
- int i = 0;
- FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
- for (Fetcher fetcher : fetcherRunners) {
- // TODO store the fetcher histories
- if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
- builder.setStartTime(fetcher.getStartTime());
- builder.setFinishTime(fetcher.getFinishTime());
- builder.setFileLength(fetcher.getFileLen());
- builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
- builder.setState(fetcher.getState());
-
- taskHistory.addFetcherHistory(builder.build());
- }
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
- }
- taskHistory.setFinishedFetchCount(i);
- }
- } catch (Exception e) {
- LOG.warn(e.getMessage(), e);
- }
-
- return taskHistory;
- }
-
- public int hashCode() {
- return context.hashCode();
- }
-
- public boolean equals(Object obj) {
- if (obj instanceof Task) {
- Task other = (Task) obj;
- return this.context.equals(other.context);
- }
- return false;
- }
-
- private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
- throws IOException {
- Configuration c = new Configuration(systemConf);
- c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
- FileSystem fs = FileSystem.get(c);
- Path tablePath = new Path(file.getAbsolutePath());
-
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus f : fileLists) {
- if (f.getLen() == 0) {
- continue;
- }
- tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
- listTablets.add(tablet);
- }
-
- // Special treatment for locally pseudo fetched chunks
- synchronized (localChunks) {
- for (FileChunk chunk : localChunks) {
- if (name.equals(chunk.getEbId())) {
- tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
- listTablets.add(tablet);
- LOG.info("One local chunk is added to listTablets");
- }
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- private class FetchRunner implements Runnable {
- private final TaskAttemptContext ctx;
- private final Fetcher fetcher;
- private int maxRetryNum;
-
- public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
- this.ctx = ctx;
- this.fetcher = fetcher;
- this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
- }
-
- @Override
- public void run() {
- int retryNum = 0;
- int retryWaitTime = 1000; //sec
-
- try { // for releasing fetch latch
- while(!context.isStopped() && retryNum < maxRetryNum) {
- if (retryNum > 0) {
- try {
- Thread.sleep(retryWaitTime);
- retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
- }
- try {
- FileChunk fetched = fetcher.get();
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
- && fetched.getFile() != null) {
- if (fetched.fromRemote() == false) {
- localChunks.add(fetched);
- LOG.info("Add a new FileChunk to local chunk list");
- }
- break;
- }
- } catch (Throwable e) {
- LOG.error("Fetch failed: " + fetcher.getURI(), e);
- }
- retryNum++;
- }
- } finally {
- if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
- fetcherFinished(ctx);
- } else {
- if (retryNum == maxRetryNum) {
- LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
- }
- stopScriptExecutors();
- context.stop(); // retry task
- ctx.getFetchLatch().countDown();
- }
- }
- }
- }
-
- @VisibleForTesting
- public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
- if (totalFetcher > 0) {
- return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
- } else {
- return 0.0f;
- }
- }
-
- private synchronized void fetcherFinished(TaskAttemptContext ctx) {
- int fetcherSize = fetcherRunners.size();
- if(fetcherSize == 0) {
- return;
- }
-
- ctx.getFetchLatch().countDown();
-
- int remainFetcher = (int) ctx.getFetchLatch().getCount();
- if (remainFetcher == 0) {
- context.setFetcherProgress(FETCHER_PROGRESS);
- } else {
- context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
- context.setProgressChanged(true);
- }
- }
-
- private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
- List<FetchImpl> fetches) throws IOException {
-
- if (fetches.size() > 0) {
- Path inputDir = executionBlockContext.getLocalDirAllocator().
- getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-
- int i = 0;
- File storeDir;
- File defaultStoreFile;
- FileChunk storeChunk = null;
- List<Fetcher> runnerList = Lists.newArrayList();
-
- for (FetchImpl f : fetches) {
- storeDir = new File(inputDir.toString(), f.getName());
- if (!storeDir.exists()) {
- storeDir.mkdirs();
- }
-
- for (URI uri : f.getURIs()) {
- defaultStoreFile = new File(storeDir, "in_" + i);
- InetAddress address = InetAddress.getByName(uri.getHost());
-
- WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
- if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
- boolean hasError = false;
- try {
- LOG.info("Try to get local file chunk at local host");
- storeChunk = getLocalStoredFileChunk(uri, systemConf);
- } catch (Throwable t) {
- hasError = true;
- }
-
- // When a range request is out of range, storeChunk will be NULL. This case is normal state.
- // So, we should skip and don't need to create storeChunk.
- if (storeChunk == null && !hasError) {
- continue;
- }
-
- if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
- && hasError == false) {
- storeChunk.setFromRemote(false);
- } else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
- }
- } else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
- }
-
- // If we decide that intermediate data should be really fetched from a remote host, storeChunk
- // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
- storeChunk.setEbId(f.getName());
- Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
- LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
- runnerList.add(fetcher);
- i++;
- }
- }
- ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
- return runnerList;
- } else {
- return Lists.newArrayList();
- }
- }
-
- private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
- // Parse the URI
- LOG.info("getLocalStoredFileChunk starts");
- final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> taskIdList = params.get("ta");
- final List<String> stageIds = params.get("sid");
- final List<String> partIds = params.get("p");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- if (types == null || stageIds == null || qids == null || partIds == null) {
- LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
- return null;
- }
-
- if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
- LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
- return null;
- }
-
- String queryId = qids.get(0);
- String shuffleType = types.get(0);
- String sid = stageIds.get(0);
- String partId = partIds.get(0);
-
- if (shuffleType.equals("r") && taskIdList == null) {
- LOG.error("Invalid URI - For range shuffle, taskId is required");
- return null;
- }
- List<String> taskIds = splitMaps(taskIdList);
-
- FileChunk chunk = null;
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
-
- // The working directory of Tajo worker for each query, including stage
- String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
-
- // If the stage requires a range shuffle
- if (shuffleType.equals("r")) {
- String ta = taskIds.get(0);
- if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
- LOG.warn("Range shuffle - file not exist");
- return null;
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- try {
- chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("getFileChunks() throws exception");
- return null;
- }
-
- // If the stage requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
- String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
- if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
- LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
- return null;
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
- if (startPos >= file.length()) {
- LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
- return null;
- }
- chunk = new FileChunk(file, startPos, readLen);
-
- } else {
- LOG.error("Unknown shuffle type");
- return null;
- }
-
- return chunk;
- }
-
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- public static Path getTaskAttemptDir(TaskAttemptId quid) {
- Path workDir =
- StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
- String.valueOf(quid.getTaskId().getId()),
- String.valueOf(quid.getId()));
- return workDir;
- }
+ TajoWorkerProtocol.TaskStatusProto getReport();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 58028ac..d020639 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -46,7 +46,6 @@ import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -60,13 +59,13 @@ public class TaskAttemptContext {
private volatile TaskAttemptState state;
private TableStats resultStats;
- private TaskAttemptId queryId;
+ private TaskAttemptId taskId;
private final Path workDir;
private boolean needFetch = false;
private CountDownLatch doneFetchPhaseSignal;
private float progress = 0.0f;
private float fetcherProgress = 0.0f;
- private AtomicBoolean progressChanged = new AtomicBoolean(false);
+ private volatile boolean progressChanged;
/** a map of shuffled file outputs */
private Map<Integer, String> shuffleFileOutputs;
@@ -87,7 +86,7 @@ public class TaskAttemptContext {
private EvalContext evalContext = new EvalContext();
public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
- final TaskAttemptId queryId,
+ final TaskAttemptId taskId,
final FragmentProto[] fragments,
final Path workDir) {
this.queryContext = queryContext;
@@ -97,7 +96,7 @@ public class TaskAttemptContext {
this.sharedResource = executionBlockContext.getSharedResource();
}
- this.queryId = queryId;
+ this.taskId = taskId;
if (fragments != null) {
for (FragmentProto t : fragments) {
@@ -114,25 +113,15 @@ public class TaskAttemptContext {
this.workDir = workDir;
this.shuffleFileOutputs = Maps.newHashMap();
- state = TaskAttemptState.TA_PENDING;
+ this.state = TaskAttemptState.TA_PENDING;
this.partitionOutputVolume = Maps.newHashMap();
-
- if (workerContext != null) {
- this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
- } else {
- try {
- this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
}
@VisibleForTesting
- public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId queryId,
+ public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId,
final Fragment [] fragments, final Path workDir) {
- this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
}
public TajoConf getConf() {
@@ -308,9 +297,10 @@ public class TaskAttemptContext {
public Path getWorkDir() {
return this.workDir;
}
-
+
+ //TODO change to getTaskAttemptId()
public TaskAttemptId getTaskId() {
- return this.queryId;
+ return this.taskId;
}
public float getProgress() {
@@ -326,17 +316,11 @@ public class TaskAttemptContext {
this.progress = progress;
}
- if (previousProgress != progress) {
- setProgressChanged(true);
- }
+ this.progressChanged = previousProgress != progress;
}
public boolean isProgressChanged() {
- return progressChanged.get();
- }
-
- public void setProgressChanged(boolean changed){
- progressChanged.set(changed);
+ return progressChanged;
}
public void setExecutorProgress(float executorProgress) {
@@ -355,7 +339,9 @@ public class TaskAttemptContext {
if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){
fetcherProgress = 0.0f;
}
+ float previousProgress = this.fetcherProgress;
this.fetcherProgress = fetcherProgress;
+ this.progressChanged = previousProgress != fetcherProgress;
}
public FragmentProto getTable(String id) {
@@ -383,13 +369,13 @@ public class TaskAttemptContext {
}
public int hashCode() {
- return Objects.hashCode(queryId);
+ return Objects.hashCode(taskId);
}
public boolean equals(Object obj) {
if (obj instanceof TaskAttemptContext) {
TaskAttemptContext other = (TaskAttemptContext) obj;
- return queryId.equals(other.getTaskId());
+ return taskId.equals(other.getTaskId());
} else {
return false;
}
@@ -399,11 +385,18 @@ public class TaskAttemptContext {
return queryContext;
}
- public TaskAttemptId getQueryId() {
- return queryId;
- }
-
public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+ if(hashShuffleAppenderManager == null) {
+ if (workerContext != null) {
+ this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
+ } else {
+ try {
+ this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
return hashShuffleAppenderManager;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
new file mode 100644
index 0000000..2576726
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.TajoProtos;
+
+/**
+ * The driver class for Tajo Task processing.
+ */
+public class TaskContainer implements Runnable {
+ private static final Log LOG = LogFactory.getLog(TaskContainer.class);
+
+ private final TaskExecutor executor;
+ private final int sequenceId;
+
+ public TaskContainer(int sequenceId, TaskExecutor executor) {
+ this.sequenceId = sequenceId;
+ this.executor = executor;
+ }
+
+ @Override
+ public void run() {
+ while (!executor.isStopped()) {
+
+ Task task = null;
+ try {
+ task = executor.getNextTask();
+
+ task.getExecutionBlockContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sequenceId + TaskContainer.class.getSimpleName() +
+ " got task:" + task.getTaskContext().getTaskId());
+ }
+
+ TaskAttemptContext taskAttemptContext = task.getTaskContext();
+ if (taskAttemptContext.isStopped()) return;
+
+ task.init();
+
+ if (task.hasFetchPhase()) {
+ task.fetch(); // The fetch is performed in an asynchronous way.
+ }
+
+ if (!taskAttemptContext.isStopped()) {
+ task.run();
+ }
+
+ task.cleanup();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ if (task != null) {
+ try {
+ task.abort();
+ task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage());
+ } catch (Throwable t) {
+ LOG.fatal(t.getMessage(), t);
+ }
+ }
+ } finally {
+ if (task != null) {
+ executor.stopTask(task.getTaskContext().getTaskId());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
new file mode 100644
index 0000000..299952e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.*;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * TaskExecutor uses a number of threads equal to the number of slots available for running tasks on the Worker
+ */
+public class TaskExecutor extends AbstractService implements EventHandler<TaskExecutorEvent> {
+ private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
+
+ private final TaskManager taskManager;
+ private final EventHandler rmEventHandler;
+ private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
+ private final BlockingQueue<Task> taskQueue;
+ private final AtomicInteger runningTasks;
+ private ThreadPoolExecutor fetcherExecutor;
+ private ExecutorService threadPool;
+ private TajoConf tajoConf;
+ private volatile boolean isStopped;
+
+ public TaskExecutor(TaskManager taskManager, EventHandler rmEventHandler) {
+ super(TaskExecutor.class.getName());
+ this.taskManager = taskManager;
+ this.rmEventHandler = rmEventHandler;
+ this.allocatedResourceMap = Maps.newConcurrentMap();
+ this.runningTasks = new AtomicInteger();
+ this.taskQueue = new LinkedBlockingQueue<Task>();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+
+ this.tajoConf = (TajoConf) conf;
+ this.taskManager.getDispatcher().register(TaskExecutorEvent.EventType.class, this);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ int nThreads = this.tajoConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ this.threadPool = Executors.newFixedThreadPool(nThreads,
+ new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());
+
+ //TODO move to tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+ int maxFetcherThreads = Runtime.getRuntime().availableProcessors() * 2;
+ this.fetcherExecutor = new ThreadPoolExecutor(Math.min(nThreads, maxFetcherThreads),
+ maxFetcherThreads,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(true));
+
+
+ for (int i = 0; i < nThreads; i++) {
+ threadPool.submit(new TaskContainer(i, this));
+ }
+
+ super.serviceStart();
+ LOG.info("Started TaskExecutor[" + nThreads + "], Fetcher executor[" + maxFetcherThreads + "]");
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ isStopped = true;
+
+ threadPool.shutdown();
+ fetcherExecutor.shutdown();
+ super.serviceStop();
+ }
+
+ public boolean isStopped() {
+ return isStopped;
+ }
+
+ public int getRunningTasks() {
+ return runningTasks.get();
+ }
+
+ /**
+ * This will block until a task is available.
+ */
+ protected Task getNextTask() {
+ Task task = null;
+ try {
+ task = taskQueue.take();
+ } catch (InterruptedException e) {
+ LOG.fatal(e);
+ }
+ return task;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void stopTask(TaskAttemptId taskId) {
+ runningTasks.decrementAndGet();
+ rmEventHandler.handle(new NodeResourceDeallocateEvent(allocatedResourceMap.remove(taskId)));
+ }
+
+ protected ExecutorService getFetcherExecutor() {
+ return fetcherExecutor;
+ }
+
+
+ protected Task createTask(ExecutionBlockContext executionBlockContext,
+ TajoWorkerProtocol.TaskRequestProto taskRequest) throws IOException {
+ Task task = null;
+ TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+ if (executionBlockContext.getTasks().containsKey(taskAttemptId)) {
+ String errorMessage = "Duplicate Task Attempt: " + taskAttemptId;
+ LOG.error(errorMessage);
+ executionBlockContext.fatalError(taskAttemptId, errorMessage);
+ } else {
+ task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext, getFetcherExecutor());
+ executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task);
+ }
+ return task;
+ }
+
+ @Override
+ public void handle(TaskExecutorEvent event) {
+
+ if (event instanceof TaskStartEvent) {
+ TaskStartEvent startEvent = (TaskStartEvent) event;
+ allocatedResourceMap.put(startEvent.getTaskId(), startEvent.getAllocatedResource());
+
+ ExecutionBlockContext context = taskManager.getExecutionBlockContext(
+ startEvent.getTaskId().getTaskId().getExecutionBlockId());
+
+ try {
+ Task task = createTask(context, startEvent.getTaskRequest());
+ if (task != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() +
+ ", allocated resource: " + startEvent.getAllocatedResource());
+ }
+ taskQueue.put(task);
+ runningTasks.incrementAndGet();
+ context.getWorkerContext().getWorkerSystemMetrics()
+ .histogram("tasks", "running").update(runningTasks.get());
+ } else {
+ LOG.warn("Release duplicate task resource: " + startEvent.getAllocatedResource());
+ stopTask(startEvent.getTaskId());
+ }
+ } catch (InterruptedException e) {
+ if (!isStopped) {
+ LOG.fatal(e.getMessage(), e);
+ }
+ } catch (IOException e) {
+ stopTask(startEvent.getTaskId());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
new file mode 100644
index 0000000..be3960b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -0,0 +1,838 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+public class TaskImpl implements Task {
+ private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+ private static final float FETCHER_PROGRESS = 0.5f;
+
+ private final TajoConf systemConf;
+ private final QueryContext queryContext;
+ private final ExecutionBlockContext executionBlockContext;
+ private final TaskRequest request;
+ private final Map<String, TableDesc> descs;
+ private final TableStats inputStats;
+ private final ExecutorService fetcherExecutor;
+ private final Path taskDir;
+
+ private final TaskAttemptContext context;
+ private List<Fetcher> fetcherRunners;
+ private LogicalNode plan;
+ private PhysicalExec executor;
+
+ private boolean interQuery;
+ private Path inputTableBaseDir;
+
+ private long startTime;
+ private long endTime;
+
+ private List<FileChunk> localChunks;
+ // TODO - to be refactored
+ private ShuffleType shuffleType = null;
+ private Schema finalSchema = null;
+
+ private TupleComparator sortComp = null;
+
+ public TaskImpl(final TaskRequest request,
+ final ExecutionBlockContext executionBlockContext,
+ final ExecutorService fetcherExecutor) throws IOException {
+
+ this.request = request;
+ this.executionBlockContext = executionBlockContext;
+ this.systemConf = executionBlockContext.getConf();
+ this.queryContext = request.getQueryContext(systemConf);
+ this.inputStats = new TableStats();
+ this.fetcherRunners = Lists.newArrayList();
+ this.fetcherExecutor = fetcherExecutor;
+ this.descs = Maps.newHashMap();
+
+ Path baseDirPath = executionBlockContext.createBaseDir();
+ LOG.info("Task basedir is created (" + baseDirPath +")");
+ TaskAttemptId taskAttemptId = request.getId();
+
+ this.taskDir = StorageUtil.concatPath(baseDirPath,
+ taskAttemptId.getTaskId().getId() + "_" + taskAttemptId.getId());
+ this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskAttemptId,
+ request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+ this.context.setDataChannel(request.getDataChannel());
+ this.context.setEnforcer(request.getEnforcer());
+ this.context.setState(TaskAttemptState.TA_PENDING);
+ }
+
+ public void initPlan() throws IOException {
+ plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
+ LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+ if (scanNode != null) {
+ for (LogicalNode node : scanNode) {
+ ScanNode scan = (ScanNode) node;
+ descs.put(scan.getCanonicalName(), scan.getTableDesc());
+ }
+ }
+
+ LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
+ if (partitionScanNode != null) {
+ for (LogicalNode node : partitionScanNode) {
+ PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+ descs.put(scan.getCanonicalName(), scan.getTableDesc());
+ }
+ }
+
+ interQuery = request.getProto().getInterQuery();
+ if (interQuery) {
+ context.setInterQuery();
+ this.shuffleType = context.getDataChannel().getShuffleType();
+
+ if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+ SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+ this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+ this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
+ }
+ } else {
+ Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+ .getAppenderFilePath(getId(), queryContext.getStagingDir());
+ LOG.info("Output File Path: " + outFilePath);
+ context.setOutputPath(outFilePath);
+ }
+
+ this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+ LOG.info("==================================");
+ LOG.info("* Stage " + request.getId() + " is initialized");
+ LOG.info("* InterQuery: " + interQuery
+ + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
+ ", Fragments (num: " + request.getFragments().size() + ")" +
+ ", Fetches (total:" + request.getFetches().size() + ") :");
+
+ if(LOG.isDebugEnabled()) {
+ for (FetchImpl f : request.getFetches()) {
+ LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+ }
+ }
+ LOG.info("* Local task dir: " + taskDir);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("* plan:\n");
+ LOG.debug(plan.toString());
+ }
+ LOG.info("==================================");
+ }
+
+ private void startScriptExecutors() throws IOException {
+ for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+ executor.start(systemConf);
+ }
+ }
+
+ private void stopScriptExecutors() {
+ for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+ executor.shutdown();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ LOG.info("Initializing: " + getId());
+
+ initPlan();
+ startScriptExecutors();
+
+ if (context.getState() == TaskAttemptState.TA_PENDING) {
+ // initialize a task temporal dir
+ FileSystem localFS = executionBlockContext.getLocalFS();
+ localFS.mkdirs(taskDir);
+
+ if (request.getFetches().size() > 0) {
+ inputTableBaseDir = localFS.makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
+ getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+ localFS.mkdirs(inputTableBaseDir);
+ Path tableDir;
+ for (String inputTable : context.getInputTables()) {
+ tableDir = new Path(inputTableBaseDir, inputTable);
+ if (!localFS.exists(tableDir)) {
+ LOG.info("the directory is created " + tableDir.toUri());
+ localFS.mkdirs(tableDir);
+ }
+ }
+ }
+ // for localizing the intermediate data
+ fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
+ }
+ }
+
+ private TaskAttemptId getId() {
+ return context.getTaskId();
+ }
+
+ public String toString() {
+ return "TaskId: " + this.getId() + " Status: " + context.getState();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return context.isStopped();
+ }
+
+ @Override
+ public TaskAttemptContext getTaskContext() {
+ return context;
+ }
+
+ @Override
+ public ExecutionBlockContext getExecutionBlockContext() {
+ return executionBlockContext;
+ }
+
+ @Override
+ public boolean hasFetchPhase() {
+ return fetcherRunners.size() > 0;
+ }
+
+ @Override
+ public void fetch() {
+ for (Fetcher f : fetcherRunners) {
+ fetcherExecutor.submit(new FetchRunner(context, f));
+ }
+ }
+
+ @Override
+ public void kill() {
+ stopScriptExecutors();
+ context.setState(TaskAttemptState.TA_KILLED);
+ context.stop();
+ }
+
+ @Override
+ public void abort() {
+ stopScriptExecutors();
+ context.setState(TaskAttemptState.TA_FAILED);
+ context.stop();
+ }
+
+ @Override
+ public TaskStatusProto getReport() {
+ TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+ builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
+ builder.setId(context.getTaskId().getProto())
+ .setProgress(context.getProgress())
+ .setState(context.getState());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.getResultStats() != null) {
+ builder.setResultStats(context.getResultStats().getProto());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean isProgressChanged() {
+ return context.isProgressChanged();
+ }
+
+ @Override
+ public void updateProgress() {
+ if(context != null && context.isStopped()){
+ return;
+ }
+
+ if (executor != null && context.getProgress() < 1.0f) {
+ context.setExecutorProgress(executor.getProgress());
+ }
+ }
+
+ private CatalogProtos.TableStatsProto reloadInputStats() {
+ synchronized(inputStats) {
+ if (this.executor == null) {
+ return inputStats.getProto();
+ }
+
+ TableStats executorInputStats = this.executor.getInputStats();
+
+ if (executorInputStats != null) {
+ inputStats.setValues(executorInputStats);
+ }
+ return inputStats.getProto();
+ }
+ }
+
+ private TaskCompletionReport getTaskCompletionReport() {
+ TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+ builder.setId(context.getTaskId().getProto());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.hasResultStats()) {
+ builder.setResultStats(context.getResultStats().getProto());
+ } else {
+ builder.setResultStats(new TableStats().getProto());
+ }
+
+ Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
+ if (it.hasNext()) {
+ do {
+ Entry<Integer, String> entry = it.next();
+ ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+ part.setPartId(entry.getKey());
+
+ // Set output volume
+ if (context.getPartitionOutputVolume() != null) {
+ for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
+ if (entry.getKey().equals(e.getKey())) {
+ part.setVolume(e.getValue().longValue());
+ break;
+ }
+ }
+ }
+
+ builder.addShuffleFileOutputs(part.build());
+ } while (it.hasNext());
+ }
+
+ return builder.build();
+ }
+
+ private void waitForFetch() throws InterruptedException, IOException {
+ context.getFetchLatch().await();
+ LOG.info(context.getTaskId() + " All fetches are done!");
+ Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+
+ // Get all broadcasted tables
+ Set<String> broadcastTableNames = new HashSet<String>();
+ List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
+ if (broadcasts != null) {
+ for (EnforceProperty eachBroadcast : broadcasts) {
+ broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
+ }
+ }
+
+ // localize the fetched data and skip the broadcast table
+ for (String inputTable: inputs) {
+ if (broadcastTableNames.contains(inputTable)) {
+ continue;
+ }
+ File tableDir = new File(context.getFetchIn(), inputTable);
+ FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+ context.updateAssignedFragments(inputTable, frags);
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ startTime = System.currentTimeMillis();
+ Throwable error = null;
+
+ try {
+ if(!context.isStopped()) {
+ context.setState(TajoProtos.TaskAttemptState.TA_RUNNING);
+ if (context.hasFetchPhase()) {
+ // If the fetch is still in progress, the query unit must wait for complete.
+ waitForFetch();
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ updateProgress();
+ }
+
+ this.executor = executionBlockContext.getTQueryEngine().createPlan(context, plan);
+ this.executor.init();
+
+ while(!context.isStopped() && executor.next() != null) {
+ }
+ }
+ } catch (Throwable e) {
+ error = e ;
+ LOG.error(e.getMessage(), e);
+ stopScriptExecutors();
+ context.stop();
+ } finally {
+ if (executor != null) {
+ try {
+ executor.close();
+ reloadInputStats();
+ } catch (IOException e) {
+ LOG.error(e, e);
+ }
+ this.executor = null;
+ }
+
+ executionBlockContext.completedTasksNum.incrementAndGet();
+ context.getHashShuffleAppenderManager().finalizeTask(getId());
+
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
+ if (context.isStopped()) {
+ context.setExecutorProgress(0.0f);
+
+ if (context.getState() == TaskAttemptState.TA_KILLED) {
+ queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+ executionBlockContext.killedTasksNum.incrementAndGet();
+ } else {
+ context.setState(TaskAttemptState.TA_FAILED);
+ TaskFatalErrorReport.Builder errorBuilder =
+ TaskFatalErrorReport.newBuilder()
+ .setId(getId().getProto());
+ if (error != null) {
+ if (error.getMessage() == null) {
+ errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+ } else {
+ errorBuilder.setErrorMessage(error.getMessage());
+ }
+ errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+ }
+
+ queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+ executionBlockContext.failedTasksNum.incrementAndGet();
+ }
+ } else {
+ // if successful
+ context.stop();
+ context.setProgress(1.0f);
+ context.setState(TaskAttemptState.TA_SUCCEEDED);
+ executionBlockContext.succeededTasksNum.incrementAndGet();
+
+ TaskCompletionReport report = getTaskCompletionReport();
+ queryMasterStub.done(null, report, NullCallback.get());
+ }
+ endTime = System.currentTimeMillis();
+ LOG.info(context.getTaskId() + " completed. " +
+ "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+ ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ TaskHistory taskHistory = createTaskHistory();
+ executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory);
+ executionBlockContext.getTasks().remove(getId());
+
+ fetcherRunners.clear();
+ fetcherRunners = null;
+ try {
+ if(executor != null) {
+ executor.close();
+ executor = null;
+ }
+ } catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ }
+
+ executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
+ stopScriptExecutors();
+ }
+
+ public TaskHistory createTaskHistory() {
+ TaskHistory taskHistory = null;
+ try {
+ taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
+ startTime, endTime, reloadInputStats());
+
+ if (context.getOutputPath() != null) {
+ taskHistory.setOutputPath(context.getOutputPath().toString());
+ }
+
+ if (context.getWorkDir() != null) {
+ taskHistory.setWorkingPath(context.getWorkDir().toString());
+ }
+
+ if (context.getResultStats() != null) {
+ taskHistory.setOutputStats(context.getResultStats().getProto());
+ }
+
+ if (hasFetchPhase()) {
+ taskHistory.setTotalFetchCount(fetcherRunners.size());
+ int i = 0;
+ FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+ for (Fetcher fetcher : fetcherRunners) {
+ // TODO store the fetcher histories
+ if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
+ builder.setStartTime(fetcher.getStartTime());
+ builder.setFinishTime(fetcher.getFinishTime());
+ builder.setFileLength(fetcher.getFileLen());
+ builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+ builder.setState(fetcher.getState());
+
+ taskHistory.addFetcherHistory(builder.build());
+ }
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+ }
+ taskHistory.setFinishedFetchCount(i);
+ }
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+
+ return taskHistory;
+ }
+
+ public int hashCode() {
+ return context.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof TaskImpl) {
+ TaskImpl other = (TaskImpl) obj;
+ return this.context.equals(other.context);
+ }
+ return false;
+ }
+
+ private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+ throws IOException {
+ Configuration c = new Configuration(systemConf);
+ c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+ FileSystem fs = FileSystem.get(c);
+ Path tablePath = new Path(file.getAbsolutePath());
+
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus f : fileLists) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+ listTablets.add(tablet);
+ }
+
+ // Special treatment for locally pseudo fetched chunks
+ synchronized (localChunks) {
+ for (FileChunk chunk : localChunks) {
+ if (name.equals(chunk.getEbId())) {
+ tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+ listTablets.add(tablet);
+ LOG.info("One local chunk is added to listTablets");
+ }
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ private class FetchRunner implements Runnable {
+ private final TaskAttemptContext ctx;
+ private final Fetcher fetcher;
+ private int maxRetryNum;
+
+ public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+ this.ctx = ctx;
+ this.fetcher = fetcher;
+ this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
+ }
+
+ @Override
+ public void run() {
+ int retryNum = 0;
+ int retryWaitTime = 1000; //sec
+
+ try { // for releasing fetch latch
+ while(!context.isStopped() && retryNum < maxRetryNum) {
+ if (retryNum > 0) {
+ try {
+ Thread.sleep(retryWaitTime);
+ retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+ }
+ try {
+ FileChunk fetched = fetcher.get();
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+ && fetched.getFile() != null) {
+ if (fetched.fromRemote() == false) {
+ localChunks.add(fetched);
+ LOG.info("Add a new FileChunk to local chunk list");
+ }
+ break;
+ }
+ } catch (Throwable e) {
+ LOG.error("Fetch failed: " + fetcher.getURI(), e);
+ }
+ retryNum++;
+ }
+ } finally {
+ if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+ fetcherFinished(ctx);
+ } else {
+ if (retryNum == maxRetryNum) {
+ LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+ }
+ stopScriptExecutors();
+ context.stop(); // retry task
+ ctx.getFetchLatch().countDown();
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+ if (totalFetcher > 0) {
+ return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+ } else {
+ return 0.0f;
+ }
+ }
+
+ private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+ int fetcherSize = fetcherRunners.size();
+ if(fetcherSize == 0) {
+ return;
+ }
+
+ ctx.getFetchLatch().countDown();
+
+ int remainFetcher = (int) ctx.getFetchLatch().getCount();
+ if (remainFetcher == 0) {
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ } else {
+ context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+ }
+ }
+
+ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+ List<FetchImpl> fetches) throws IOException {
+
+ if (fetches.size() > 0) {
+ Path inputDir = executionBlockContext.getLocalDirAllocator().
+ getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+
+ int i = 0;
+ File storeDir;
+ File defaultStoreFile;
+ FileChunk storeChunk = null;
+ List<Fetcher> runnerList = Lists.newArrayList();
+
+ for (FetchImpl f : fetches) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
+ }
+
+ for (URI uri : f.getURIs()) {
+ defaultStoreFile = new File(storeDir, "in_" + i);
+ InetAddress address = InetAddress.getByName(uri.getHost());
+
+ WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
+ if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
+ boolean hasError = false;
+ try {
+ LOG.info("Try to get local file chunk at local host");
+ storeChunk = getLocalStoredFileChunk(uri, systemConf);
+ } catch (Throwable t) {
+ hasError = true;
+ }
+
+ // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+ // So, we should skip and don't need to create storeChunk.
+ if (storeChunk == null && !hasError) {
+ continue;
+ }
+
+ if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+ && hasError == false) {
+ storeChunk.setFromRemote(false);
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+
+ // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+ // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+ storeChunk.setEbId(f.getName());
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
+ LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+ runnerList.add(fetcher);
+ i++;
+ }
+ }
+ ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+ return runnerList;
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+ // Parse the URI
+ LOG.info("getLocalStoredFileChunk starts");
+ final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> stageIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
+
+ if (types == null || stageIds == null || qids == null || partIds == null) {
+ LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
+ return null;
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
+ LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
+ return null;
+ }
+
+ String queryId = qids.get(0);
+ String shuffleType = types.get(0);
+ String sid = stageIds.get(0);
+ String partId = partIds.get(0);
+
+ if (shuffleType.equals("r") && taskIdList == null) {
+ LOG.error("Invalid URI - For range shuffle, taskId is required");
+ return null;
+ }
+ List<String> taskIds = splitMaps(taskIdList);
+
+ FileChunk chunk;
+ long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+ long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+ LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
+
+ // The working directory of Tajo worker for each query, including stage
+ String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+ // If the stage requires a range shuffle
+ if (shuffleType.equals("r")) {
+ String ta = taskIds.get(0);
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+ LOG.warn("Range shuffle - file not exist");
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ try {
+ chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("getFileChunks() throws exception");
+ return null;
+ }
+
+ // If the stage requires a hash shuffle or a scattered hash shuffle
+ } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+ String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+ LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+ return null;
+ }
+ chunk = new FileChunk(file, startPos, readLen);
+
+ } else {
+ LOG.error("Unknown shuffle type");
+ return null;
+ }
+
+ return chunk;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ public static Path getTaskAttemptDir(TaskAttemptId quid) {
+ Path workDir =
+ StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+ String.valueOf(quid.getTaskId().getId()),
+ String.valueOf(quid.getId()));
+ return workDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
new file mode 100644
index 0000000..7990a72
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.event.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A TaskManager is responsible for managing executionBlock resource and tasks.
+ * */
+public class TaskManager extends AbstractService implements EventHandler<TaskManagerEvent> {
+ private static final Log LOG = LogFactory.getLog(TaskManager.class);
+
+ private final TajoWorker.WorkerContext workerContext;
+ private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap;
+ private final Dispatcher dispatcher;
+ private final EventHandler rmEventHandler;
+
+ private TajoConf tajoConf;
+
+ public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+ super(TaskManager.class.getName());
+
+ this.dispatcher = dispatcher;
+ this.workerContext = workerContext;
+ this.executionBlockContextMap = Maps.newHashMap();
+ this.rmEventHandler = rmEventHandler;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+
+ this.tajoConf = (TajoConf)conf;
+ dispatcher.register(TaskManagerEvent.EventType.class, this);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+
+ for(ExecutionBlockContext context: executionBlockContextMap.values()) {
+ context.stop();
+ }
+ executionBlockContextMap.clear();
+ super.serviceStop();
+ }
+
+ protected Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ protected TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+ try {
+ ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), null, request);
+
+ context.init();
+ return context;
+ } catch (Throwable e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void stopExecutionBlock(ExecutionBlockContext context,
+ TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+
+ if(context != null){
+ try {
+ context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId());
+ context.sendShuffleReport();
+ getWorkerContext().getTaskHistoryWriter().flushTaskHistories();
+ } catch (Exception e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new RuntimeException(e);
+ } finally {
+ context.stop();
+
+ /* cleanup intermediate files */
+ for (TajoIdProtos.ExecutionBlockIdProto ebId : cleanupList.getExecutionBlockIdList()) {
+ String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(ebId)).toString();
+ workerContext.cleanup(inputDir);
+ String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(ebId)).toString();
+ workerContext.cleanup(outputDir);
+ }
+ }
+ LOG.info("Stopped execution block:" + context.getExecutionBlockId());
+ }
+ }
+
+ @Override
+ public void handle(TaskManagerEvent event) {
+ LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
+
+ if (event instanceof ExecutionBlockStartEvent) {
+
+ //receive event from NodeResourceManager
+ if(!executionBlockContextMap.containsKey(event.getExecutionBlockId())) {
+ ExecutionBlockContext context = createExecutionBlock(((ExecutionBlockStartEvent) event).getRequestProto());
+ executionBlockContextMap.put(context.getExecutionBlockId(), context);
+ } else {
+ LOG.warn("Already initialized ExecutionBlock: " + event.getExecutionBlockId());
+ }
+ } else if (event instanceof ExecutionBlockStopEvent) {
+ //receive event from QueryMaster
+ rmEventHandler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+ stopExecutionBlock(executionBlockContextMap.remove(event.getExecutionBlockId()),
+ ((ExecutionBlockStopEvent) event).getCleanupList());
+ }
+ }
+
+ protected ExecutionBlockContext getExecutionBlockContext(ExecutionBlockId executionBlockId) {
+ return executionBlockContextMap.get(executionBlockId);
+ }
+
+ public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) {
+ ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId());
+ if (context != null) {
+ return context.getTask(taskAttemptId);
+ }
+ return null;
+ }
+
+ public List<TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) throws IOException {
+ List<TaskHistory> histories = new ArrayList<TaskHistory>();
+ ExecutionBlockContext context = executionBlockContextMap.get(executionblockId);
+ if (context != null) {
+ histories.addAll(context.getTaskHistories().values());
+ }
+ //TODO get List<TaskHistory> from HistoryReader
+ return histories;
+ }
+
+ public TaskHistory getTaskHistory(TaskId taskId) {
+ TaskHistory history = null;
+ ExecutionBlockContext context = executionBlockContextMap.get(taskId.getExecutionBlockId());
+ if (context != null) {
+ history = context.getTaskHistories().get(taskId);
+ }
+ //TODO get TaskHistory from HistoryReader
+ return history;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 774f358..207b47e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -34,10 +34,8 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.container.TajoContainerIdPBImpl;
import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
-import java.net.ConnectException;
import java.util.concurrent.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -45,6 +43,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
/**
* The driver class for Tajo Task processing.
*/
+@Deprecated
public class TaskRunner extends AbstractService {
/** class logger */
private static final Log LOG = LogFactory.getLog(TaskRunner.class);
@@ -256,7 +255,7 @@ public class TaskRunner extends AbstractService {
LOG.info("Initializing: " + taskAttemptId);
Task task = null;
try {
- task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
+ task = new LegacyTaskImpl(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
new TaskRequestImpl(taskRequest));
getContext().getTasks().put(taskAttemptId, task);
@@ -269,10 +268,11 @@ public class TaskRunner extends AbstractService {
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
fatalError(qmClientService, taskAttemptId, t.getMessage());
+ } finally {
if(task != null) {
- task.cleanupTask();
+ task.cleanup();
}
- } finally {
+
callFuture = null;
taskRequest = null;
}