You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/15 10:44:01 UTC
[1/2] TAJO-1015: Add executionblock event in worker. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 64416dea6 -> 15450e868
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 8009ce3..5eb66b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,38 +18,45 @@
package org.apache.tajo.worker;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.worker.event.TaskRunnerEvent;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.apache.tajo.worker.event.TaskRunnerStopEvent;
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.*;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-public class TaskRunnerManager extends CompositeService {
+public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> {
private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
- private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
- private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
+ private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap = Maps.newConcurrentMap();
+ private final ConcurrentMap<String, TaskRunner> taskRunnerMap = Maps.newConcurrentMap();
+ private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
private TajoWorker.WorkerContext workerContext;
private TajoConf tajoConf;
private AtomicBoolean stop = new AtomicBoolean(false);
private FinishedTaskCleanThread finishedTaskCleanThread;
+ private Dispatcher dispatcher;
- public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+ public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) {
super(TaskRunnerManager.class.getName());
this.workerContext = workerContext;
+ this.dispatcher = dispatcher;
}
public TajoWorker.WorkerContext getWorkerContext() {
@@ -58,7 +65,9 @@ public class TaskRunnerManager extends CompositeService {
@Override
public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
tajoConf = (TajoConf)conf;
+ dispatcher.register(TaskRunnerEvent.EventType.class, this);
super.init(tajoConf);
}
@@ -82,6 +91,9 @@ public class TaskRunnerManager extends CompositeService {
}
}
}
+ for(ExecutionBlockContext context: executionBlockContextMap.values()) {
+ context.stop();
+ }
if(finishedTaskCleanThread != null) {
finishedTaskCleanThread.interrupted();
@@ -92,148 +104,35 @@ public class TaskRunnerManager extends CompositeService {
}
}
- public void stopTask(String id) {
+ public void stopTaskRunner(String id) {
LOG.info("Stop Task:" + id);
- synchronized(taskRunnerMap) {
- TaskRunner taskRunner = taskRunnerMap.remove(id);
- if (taskRunner != null) {
- synchronized(taskRunnerCompleteCounter) {
- ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
- AtomicInteger ebSuccessedTaskNums = successedTaskNums.get(ebId);
- if (ebSuccessedTaskNums == null) {
- ebSuccessedTaskNums = new AtomicInteger(taskRunner.getContext().succeededTasksNum.get());
- successedTaskNums.put(ebId, ebSuccessedTaskNums);
- } else {
- ebSuccessedTaskNums.addAndGet(taskRunner.getContext().succeededTasksNum.get());
- }
-
- Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
-
- if (counter != null) {
- if (counter.getSecond().decrementAndGet() <= 0) {
- LOG.info(ebId + "'s all tasks are completed.");
- try {
- closeExecutionBlock(ebId, ebSuccessedTaskNums.get(), taskRunner);
- } catch (Exception e) {
- LOG.error(ebId + ", closing error:" + e.getMessage(), e);
- }
- successedTaskNums.remove(ebId);
- taskRunnerCompleteCounter.remove(ebId);
- }
- }
- }
- }
- }
+ TaskRunner taskRunner = taskRunnerMap.remove(id);
+ taskRunner.stop();
if(workerContext.isYarnContainerMode()) {
stop();
}
}
- private void closeExecutionBlock(ExecutionBlockId ebId, int succeededTasks, TaskRunner lastTaskRunner) throws Exception {
- TajoWorkerProtocol.ExecutionBlockReport.Builder reporterBuilder =
- TajoWorkerProtocol.ExecutionBlockReport.newBuilder();
- reporterBuilder.setEbId(ebId.getProto());
- reporterBuilder.setReportSuccess(true);
- reporterBuilder.setSucceededTasks(succeededTasks);
- try {
- List<TajoWorkerProtocol.IntermediateEntryProto> intermediateEntries =
- new ArrayList<TajoWorkerProtocol.IntermediateEntryProto>();
- List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
- workerContext.getHashShuffleAppenderManager().close(ebId);
- if (shuffles == null) {
- reporterBuilder.addAllIntermediateEntries(intermediateEntries);
- lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
- return;
- }
-
- TajoWorkerProtocol.IntermediateEntryProto.Builder intermediateBuilder =
- TajoWorkerProtocol.IntermediateEntryProto.newBuilder();
- TajoWorkerProtocol.IntermediateEntryProto.PageProto.Builder pageBuilder =
- TajoWorkerProtocol.IntermediateEntryProto.PageProto.newBuilder();
- TajoWorkerProtocol.FailureIntermediateProto.Builder failureBuilder =
- TajoWorkerProtocol.FailureIntermediateProto.newBuilder();
-
- for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
- List<TajoWorkerProtocol.IntermediateEntryProto.PageProto> pages =
- new ArrayList<TajoWorkerProtocol.IntermediateEntryProto.PageProto>();
- List<TajoWorkerProtocol.FailureIntermediateProto> failureIntermediateItems =
- new ArrayList<TajoWorkerProtocol.FailureIntermediateProto>();
-
- for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
- pageBuilder.clear();
- pageBuilder.setPos(eachPage.getFirst());
- pageBuilder.setLength(eachPage.getSecond());
- pages.add(pageBuilder.build());
- }
-
- for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
- failureBuilder.clear();
- failureBuilder.setPagePos(eachFailure.getFirst());
- failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
- failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
- failureIntermediateItems.add(failureBuilder.build());
- }
-
- intermediateBuilder.clear();
-
- intermediateBuilder.setEbId(ebId.getProto())
- .setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
- workerContext.getPullServerPort())
- .setTaskId(-1)
- .setAttemptId(-1)
- .setPartId(eachShuffle.getPartId())
- .setVolume(eachShuffle.getVolume())
- .addAllPages(pages)
- .addAllFailures(failureIntermediateItems);
-
- intermediateEntries.add(intermediateBuilder.build());
- }
-
- // send intermediateEntries to QueryMaster
- reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- reporterBuilder.setReportSuccess(false);
- if (e.getMessage() == null) {
- reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
- } else {
- reporterBuilder.setReportErrorMessage(e.getMessage());
- }
- }
- lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
- }
-
public Collection<TaskRunner> getTaskRunners() {
- synchronized(taskRunnerMap) {
- return Collections.unmodifiableCollection(taskRunnerMap.values());
- }
+ return Collections.unmodifiableCollection(taskRunnerMap.values());
}
public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
- synchronized(taskRunnerHistoryMap) {
- return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
- }
+ return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
}
public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) {
- synchronized(taskRunnerHistoryMap) {
- return taskRunnerHistoryMap.get(taskRunnerId);
- }
+ return taskRunnerHistoryMap.get(taskRunnerId);
}
public TaskRunner getTaskRunner(String taskRunnerId) {
- synchronized(taskRunnerMap) {
- return taskRunnerMap.get(taskRunnerId);
- }
+ return taskRunnerMap.get(taskRunnerId);
}
- public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
- synchronized(taskRunnerMap) {
- for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
- Task task = eachTaskRunner.getContext().getTask(quAttemptId);
- if (task != null) return task;
- }
+ public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId queryUnitAttemptId) {
+ ExecutionBlockContext context = executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId());
+ if (context != null) {
+ return context.getTask(queryUnitAttemptId);
}
return null;
}
@@ -250,53 +149,61 @@ public class TaskRunnerManager extends CompositeService {
}
public int getNumTasks() {
- synchronized(taskRunnerMap) {
- return taskRunnerMap.size();
- }
+ return taskRunnerMap.size();
}
- //<# tasks, # running tasks>
- Map<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>> taskRunnerCompleteCounter =
- new HashMap<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>>();
-
- Map<ExecutionBlockId, AtomicInteger> successedTaskNums = new HashMap<ExecutionBlockId, AtomicInteger>();
-
- public void startTask(final String[] params) {
- //TODO change to use event dispatcher
- Thread t = new Thread() {
- public void run() {
+ @Override
+ public void handle(TaskRunnerEvent event) {
+ LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
+ if (event instanceof TaskRunnerStartEvent) {
+ TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
+ ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
+ String[] params = startEvent.getParams();
+ if(context == null){
try {
- TajoConf systemConf = new TajoConf(tajoConf);
- TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
- LOG.info("Start TaskRunner:" + taskRunner.getId());
- synchronized(taskRunnerMap) {
- taskRunnerMap.put(taskRunner.getId(), taskRunner);
- }
+ // QueryMaster's address
+ String host = params[4];
+ int port = Integer.parseInt(params[5]);
+
+ context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port));
+ } catch (Throwable e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ executionBlockContextMap.put(event.getExecutionBlockId(), context);
+ }
- synchronized (taskRunnerHistoryMap){
- taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory());
- }
+ TaskRunner taskRunner = new TaskRunner(context, params);
+ LOG.info("Start TaskRunner:" + taskRunner.getId());
+ taskRunnerMap.put(taskRunner.getId(), taskRunner);
+ taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
- synchronized(taskRunnerCompleteCounter) {
- ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
- Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
- if (counter == null) {
- counter = new Pair(new AtomicInteger(0), new AtomicInteger(0));
- taskRunnerCompleteCounter.put(ebId, counter);
- }
- counter.getFirst().incrementAndGet();
- counter.getSecond().incrementAndGet();
- }
- taskRunner.init(systemConf);
- taskRunner.start();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(), e);
+ taskRunner.init(context.getConf());
+ taskRunner.start();
+
+ } else if (event instanceof TaskRunnerStopEvent) {
+ ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId());
+ if(executionBlockContext != null){
+ TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
+ executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
+ executionBlockContext.stop();
+ try {
+ workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
+ } catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new RuntimeException(e);
}
}
- };
+ LOG.info("Stopped execution block:" + event.getExecutionBlockId());
+ }
+ }
+
+ public EventHandler getEventHandler(){
+ return dispatcher.getEventHandler();
+ }
- t.start();
+ public TajoConf getTajoConf() {
+ return tajoConf;
}
class FinishedTaskCleanThread extends Thread {
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
new file mode 100644
index 0000000..aac8973
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
+ public enum EventType {
+ START,
+ STOP
+ }
+
+ protected final ExecutionBlockId executionBlockId;
+
+ public TaskRunnerEvent(EventType eventType,
+ ExecutionBlockId executionBlockId) {
+ super(eventType);
+ this.executionBlockId = executionBlockId;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
new file mode 100644
index 0000000..8c9fa51
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.query.QueryContext;
+
+public class TaskRunnerStartEvent extends TaskRunnerEvent {
+
+ private final QueryContext queryContext;
+ private final String[] params;
+ private final String plan;
+
+ public TaskRunnerStartEvent(String[] params,
+ ExecutionBlockId executionBlockId,
+ QueryContext context,
+ String plan) {
+ super(EventType.START, executionBlockId);
+ this.params = params;
+ this.queryContext = context;
+ this.plan = plan;
+ }
+
+ public String[] getParams(){
+ return this.params;
+ }
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
+ public String getPlan() {
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
new file mode 100644
index 0000000..c8ec20d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerStopEvent extends TaskRunnerEvent {
+
+ public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {
+ super(EventType.STOP, executionBlockId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 0119a88..06d2a42 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -32,7 +32,7 @@ service QueryMasterProtocolService {
//from Worker
rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
rpc statusUpdate (TaskStatusProto) returns (BoolProto);
- rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc ping (ExecutionBlockIdProto) returns (BoolProto);
rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
rpc done (TaskCompletionReport) returns (BoolProto);
rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e100c48..dff2733 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -201,7 +201,7 @@ message DataChannelProto {
}
message RunExecutionBlockRequestProto {
- required string executionBlockId = 1;
+ required ExecutionBlockIdProto executionBlockId = 1;
required string queryMasterHost = 2;
required int32 queryMasterPort = 3;
required string nodeId = 4;
@@ -220,7 +220,8 @@ service TajoWorkerProtocolService {
rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
// from QueryMaster(Worker)
- rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+ rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+ rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto);
rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
rpc cleanup(QueryIdProto) returns (BoolProto);
rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 95c06bb..b755e02 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -106,6 +106,8 @@ public class TestFetcher {
@Test
public void testAdjustFetchProcess() {
+ assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
+ assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);
[2/2] git commit: TAJO-1015: Add executionblock event in worker.
(jinho)
Posted by jh...@apache.org.
TAJO-1015: Add executionblock event in worker. (jinho)
Closes #124
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/15450e86
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/15450e86
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/15450e86
Branch: refs/heads/master
Commit: 15450e868ae6a985deb988e679d19e1364c95526
Parents: 64416de
Author: jhkim <jh...@apache.org>
Authored: Mon Sep 15 17:42:38 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Sep 15 17:42:38 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 3 +
.../apache/tajo/master/TajoContainerProxy.java | 4 +-
.../querymaster/QueryMasterManagerService.java | 5 +-
.../tajo/master/querymaster/SubQuery.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 437 +++++++++++++++++++
.../worker/ExecutionBlockSharedResource.java | 36 +-
.../tajo/worker/TajoResourceAllocator.java | 41 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 82 ++--
.../tajo/worker/TajoWorkerManagerService.java | 46 +-
.../main/java/org/apache/tajo/worker/Task.java | 285 ++++--------
.../apache/tajo/worker/TaskAttemptContext.java | 47 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 277 ++----------
.../apache/tajo/worker/TaskRunnerManager.java | 253 ++++-------
.../tajo/worker/event/TaskRunnerEvent.java | 41 ++
.../tajo/worker/event/TaskRunnerStartEvent.java | 51 +++
.../tajo/worker/event/TaskRunnerStopEvent.java | 28 ++
.../src/main/proto/QueryMasterProtocol.proto | 2 +-
.../src/main/proto/TajoWorkerProtocol.proto | 5 +-
.../org/apache/tajo/worker/TestFetcher.java | 2 +
20 files changed, 924 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6def7f8..61fcec0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -431,6 +431,8 @@ Release 0.9.0 - unreleased
SUB TASKS
+ TAJO-1015: Add executionblock event in worker. (jinho)
+
TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
Release 0.8.0
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9aead24..a089b54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -262,6 +262,9 @@ public class TajoConf extends Configuration {
// Client RPC
RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4),
+ SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM("tajo.shuffle.rpc.client.worker-thread-num",
+ Runtime.getRuntime().availableProcessors()),
+
//Client service RPC Server
MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 1),
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index f3e4b72..c317ba5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -101,7 +101,7 @@ public class TajoContainerProxy extends ContainerProxy {
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
- .setExecutionBlockId(executionBlockId.toString())
+ .setExecutionBlockId(executionBlockId.getProto())
.setQueryMasterHost(myAddr.getHostName())
.setQueryMasterPort(myAddr.getPort())
.setNodeId(container.getNodeId().toString())
@@ -111,7 +111,7 @@ public class TajoContainerProxy extends ContainerProxy {
.setPlanJson(planJson)
.build();
- tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
+ tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 4f3c2ab..862dfef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -40,8 +40,6 @@ import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
public class QueryMasterManagerService extends CompositeService
implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
@@ -177,7 +175,7 @@ public class QueryMasterManagerService extends CompositeService
@Override
public void ping(RpcController controller,
- TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+ TajoIdProtos.ExecutionBlockIdProto requestProto,
RpcCallback<PrimitiveProtos.BoolProto> done) {
done.run(TajoWorker.TRUE_PROTO);
}
@@ -225,6 +223,7 @@ public class QueryMasterManagerService extends CompositeService
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request);
}
+ done.run(TajoWorker.TRUE_PROTO);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 850ffe8..92ef9f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -348,7 +348,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
if (totalProgress > 0.0f) {
- return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 1000.0f) / 1000.0f;
+ return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
} else {
return 0.0f;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
new file mode 100644
index 0000000..306ab66
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class ExecutionBlockContext {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
+
+ private TaskRunnerManager manager;
+ public AtomicInteger completedTasksNum = new AtomicInteger();
+ public AtomicInteger succeededTasksNum = new AtomicInteger();
+ public AtomicInteger killedTasksNum = new AtomicInteger();
+ public AtomicInteger failedTasksNum = new AtomicInteger();
+
+ private ClientSocketChannelFactory channelFactory;
+ // for temporal or intermediate files
+ private FileSystem localFS;
+ // for input files
+ private FileSystem defaultFS;
+ private ExecutionBlockId executionBlockId;
+ private QueryContext queryContext;
+ private String plan;
+
+ private ExecutionBlockSharedResource resource;
+
+ private TajoQueryEngine queryEngine;
+ private RpcConnectionPool connPool;
+ private InetSocketAddress qmMasterAddr;
+ private TajoConf systemConf;
+ // for the doAs block
+ private UserGroupInformation taskOwner;
+
+ private Reporter reporter;
+
+ private AtomicBoolean stop = new AtomicBoolean();
+
+ // It keeps all of the query unit attempts while a TaskRunner is running.
+ private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
+
+ private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
+
+ public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+ throws Throwable {
+ this.manager = manager;
+ this.executionBlockId = event.getExecutionBlockId();
+ this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
+ this.qmMasterAddr = queryMaster;
+ this.systemConf = manager.getTajoConf();
+ this.reporter = new Reporter();
+ this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+ this.localFS = FileSystem.getLocal(systemConf);
+
+ // Setup QueryEngine according to the query plan
+ // Here, we can setup row-based query engine or columnar query engine.
+ this.queryEngine = new TajoQueryEngine(systemConf);
+ this.queryContext = event.getQueryContext();
+ this.plan = event.getPlan();
+ this.resource = new ExecutionBlockSharedResource();
+
+ init();
+ }
+
+ public void init() throws Throwable {
+
+ LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
+ LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
+
+ LOG.info("QueryMaster Address:" + qmMasterAddr);
+
+ UserGroupInformation.setConfiguration(systemConf);
+ // TODO - 'load credential' should be implemented
+ // Getting taskOwner
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
+
+ // initialize DFS and LocalFileSystems
+ this.taskOwner = taskOwner;
+ this.reporter.startReporter();
+
+ // resource intiailization
+ try{
+ this.resource.initialize(queryContext, plan);
+ } catch (Throwable e) {
+ getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+ throw e;
+ }
+ }
+
+ public ExecutionBlockSharedResource getSharedResource() {
+ return resource;
+ }
+
+ public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception {
+ NettyClientBase clientBase = null;
+ try {
+ clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+ return clientBase.getStub();
+ } finally {
+ connPool.releaseConnection(clientBase);
+ }
+ }
+
+ public void stop(){
+ if(stop.getAndSet(true)){
+ return;
+ }
+
+ try {
+ reporter.stop();
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+
+ // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed.
+ for (Task task : tasks.values()) {
+ if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING ||
+ task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) {
+ task.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+ try{
+ task.abort();
+ } catch (Throwable e){
+ LOG.error(e);
+ }
+ }
+ }
+ tasks.clear();
+
+ resource.release();
+
+ try {
+ releaseShuffleChannelFactory();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public TajoConf getConf() {
+ return manager.getTajoConf();
+ }
+
+ public FileSystem getLocalFS() {
+ return localFS;
+ }
+
+ public FileSystem getDefaultFS() {
+ return defaultFS;
+ }
+
+ public LocalDirAllocator getLocalDirAllocator() {
+ return manager.getWorkerContext().getLocalDirAllocator();
+ }
+
+ public TajoQueryEngine getTQueryEngine() {
+ return queryEngine;
+ }
+
+ // for the local temporal dir
+ public Path createBaseDir() throws IOException {
+ // the base dir for an output dir
+ String baseDir = getBaseOutputDir(executionBlockId).toString();
+ Path baseDirPath = localFS.makeQualified(getLocalDirAllocator().getLocalPathForWrite(baseDir, systemConf));
+ return baseDirPath;
+ }
+
+ public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
+ Path workDir =
+ StorageUtil.concatPath(
+ executionBlockId.getQueryId().toString(),
+ "output",
+ String.valueOf(executionBlockId.getId()));
+ return workDir;
+ }
+
+ public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
+ Path workDir =
+ StorageUtil.concatPath(
+ executionBlockId.getQueryId().toString(),
+ "in",
+ executionBlockId.toString());
+ return workDir;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public Map<QueryUnitAttemptId, Task> getTasks() {
+ return tasks;
+ }
+
+ public Task getTask(QueryUnitAttemptId queryUnitAttemptId){
+ return tasks.get(queryUnitAttemptId);
+ }
+
+ public void stopTaskRunner(String id){
+ manager.stopTaskRunner(id);
+ }
+
+ public TaskRunner getTaskRunner(String taskRunnerId){
+ return manager.getTaskRunner(taskRunnerId);
+ }
+
+ public void addTaskHistory(String taskRunnerId, QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+ histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
+ }
+
+ public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){
+ histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId));
+ return histories.get(runner.getId());
+ }
+
+ public TajoWorker.WorkerContext getWorkerContext(){
+ return manager.getWorkerContext();
+ }
+
+ protected ClientSocketChannelFactory getShuffleChannelFactory(){
+ if(channelFactory == null) {
+ int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM);
+ channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
+ }
+ return channelFactory;
+ }
+
+ protected void releaseShuffleChannelFactory(){
+ if(channelFactory != null) {
+ channelFactory.shutdown();
+ channelFactory.releaseExternalResources();
+ channelFactory = null;
+ }
+ }
+
+ private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
+ getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
+ }
+
+ protected void reportExecutionBlock(ExecutionBlockId ebId) {
+ ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
+ reporterBuilder.setEbId(ebId.getProto());
+ reporterBuilder.setReportSuccess(true);
+ reporterBuilder.setSucceededTasks(succeededTasksNum.get());
+ try {
+ List<IntermediateEntryProto> intermediateEntries = Lists.newArrayList();
+ List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
+ getWorkerContext().getHashShuffleAppenderManager().close(ebId);
+ if (shuffles == null) {
+ reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+ sendExecutionBlockReport(reporterBuilder.build());
+ return;
+ }
+
+ IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
+ IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
+ FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
+
+ for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
+ List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
+ List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
+
+ for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
+ pageBuilder.clear();
+ pageBuilder.setPos(eachPage.getFirst());
+ pageBuilder.setLength(eachPage.getSecond());
+ pages.add(pageBuilder.build());
+ }
+
+ for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
+ failureBuilder.clear();
+ failureBuilder.setPagePos(eachFailure.getFirst());
+ failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
+ failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
+ failureIntermediateItems.add(failureBuilder.build());
+ }
+ intermediateBuilder.clear();
+
+ intermediateBuilder.setEbId(ebId.getProto())
+ .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
+ getWorkerContext().getPullServerPort())
+ .setTaskId(-1)
+ .setAttemptId(-1)
+ .setPartId(eachShuffle.getPartId())
+ .setVolume(eachShuffle.getVolume())
+ .addAllPages(pages)
+ .addAllFailures(failureIntermediateItems);
+ intermediateEntries.add(intermediateBuilder.build());
+ }
+
+ // send intermediateEntries to QueryMaster
+ reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ reporterBuilder.setReportSuccess(false);
+ if (e.getMessage() == null) {
+ reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
+ } else {
+ reporterBuilder.setReportErrorMessage(e.getMessage());
+ }
+ }
+ try {
+ sendExecutionBlockReport(reporterBuilder.build());
+ } catch (Throwable e) {
+ // can't send report to query master
+ LOG.fatal(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected class Reporter {
+ private Thread reporterThread;
+ private AtomicBoolean reporterStop = new AtomicBoolean();
+ private static final int PROGRESS_INTERVAL = 1000;
+ private static final int MAX_RETRIES = 10;
+
+ public Reporter() {
+ this.reporterThread = new Thread(createReporterThread());
+ this.reporterThread.setName("Task reporter");
+ }
+
+ public void startReporter(){
+ this.reporterThread.start();
+ }
+
+ Runnable createReporterThread() {
+
+ return new Runnable() {
+ int remainingRetries = MAX_RETRIES;
+ QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
+ @Override
+ public void run() {
+ while (!reporterStop.get() && !Thread.interrupted()) {
+ try {
+ masterStub = getQueryMasterStub();
+
+ if(tasks.size() == 0){
+ masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
+ } else {
+ for (Task task : new ArrayList<Task>(tasks.values())){
+
+ if (task.isRunning() && task.isProgressChanged()) {
+ task.updateProgress();
+ masterStub.statusUpdate(null, task.getReport(), NullCallback.get());
+ task.getContext().setProgressChanged(false);
+ } else {
+ task.updateProgress();
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ remainingRetries -=1;
+ if (remainingRetries == 0) {
+ ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+ LOG.warn("Last retry, exiting ");
+ throw new RuntimeException(t);
+ }
+ } finally {
+ if (remainingRetries > 0 && !reporterStop.get()) {
+ synchronized (reporterThread) {
+ try {
+ reporterThread.wait(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+
+ public void stop() throws InterruptedException {
+ if (reporterStop.getAndSet(true)) {
+ return;
+ }
+
+ if (reporterThread != null) {
+ // Intent of the lock is to not send an interupt in the middle of an
+ // umbilical.ping or umbilical.statusUpdate
+ synchronized (reporterThread) {
+ //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+ reporterThread.notifyAll();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index a70fbfd..e77e265 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -32,14 +32,12 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.util.Pair;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class ExecutionBlockSharedResource {
private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
private AtomicBoolean initializing = new AtomicBoolean(false);
private volatile Boolean resourceInitSuccess = new Boolean(false);
- private CountDownLatch initializedResourceLatch = new CountDownLatch(1);
// Query
private QueryContext context;
@@ -50,27 +48,18 @@ public class ExecutionBlockSharedResource {
private LogicalNode plan;
private boolean codeGenEnabled = false;
- public void initialize(final QueryContext context, final String planJson) throws InterruptedException {
+ public void initialize(final QueryContext context, final String planJson) {
if (!initializing.getAndSet(true)) {
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- ExecutionBlockSharedResource.this.context = context;
- initPlan(planJson);
- initCodeGeneration();
- resourceInitSuccess = true;
- } catch (Throwable t) {
- LOG.error(t);
- LOG.error(ExceptionUtils.getStackTrace(t));
- } finally {
- initializedResourceLatch.countDown();
- }
- }
- });
- thread.run();
- thread.join();
+ try {
+ ExecutionBlockSharedResource.this.context = context;
+ initPlan(planJson);
+ initCodeGeneration();
+ resourceInitSuccess = true;
+ } catch (Throwable t) {
+ LOG.error(t);
+ LOG.error(ExceptionUtils.getStackTrace(t));
+ }
if (!resourceInitSuccess) {
throw new RuntimeException("Resource cannot be initialized");
@@ -91,11 +80,6 @@ public class ExecutionBlockSharedResource {
}
}
- public boolean awaitInitializedResource() throws InterruptedException {
- initializedResourceLatch.await();
- return resourceInitSuccess;
- }
-
public LogicalNode getPlan() {
return this.plan;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index aaff69c..2cc8f0c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
@@ -44,14 +46,13 @@ import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.ApplicationIdUtils;
import org.apache.tajo.util.HAServiceUtil;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.net.InetSocketAddress;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -144,6 +145,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
launchTaskRunners(launchEvent);
} else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
stopContainers(event.getContainers());
+ stopExecutionBlock(event.getExecutionBlockId(), event.getContainers());
}
}
}
@@ -158,6 +160,37 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
}
+ public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ Set<NodeId> workers = Sets.newHashSet();
+ for (Container container : containers){
+ workers.add(container.getNodeId());
+ }
+
+ for (final NodeId worker : workers) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ stopExecutionBlock(executionBlockId, worker);
+ }
+ });
+ }
+ }
+
+ private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) {
+ NettyClientBase tajoWorkerRpc = null;
+ try {
+ InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
+ tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class, true);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+
+ tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get());
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc);
+ }
+ }
+
protected static class LaunchRunner implements Runnable {
private final ContainerProxy proxy;
private final ContainerId id;
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 584c60e..a8d661b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -27,15 +27,12 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathData;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.ha.TajoMasterInfo;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -45,8 +42,11 @@ import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.*;
import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.StaticHttpServer;
@@ -57,7 +57,6 @@ import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -125,6 +124,10 @@ public class TajoWorker extends CompositeService {
private HashShuffleAppenderManager hashShuffleAppenderManager;
+ private AsyncDispatcher dispatcher;
+
+ private LocalDirAllocator lDirAllocator;
+
public TajoWorker() throws Exception {
super(TajoWorker.class.getName());
}
@@ -166,7 +169,7 @@ public class TajoWorker extends CompositeService {
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
this.systemConf = (TajoConf)conf;
@@ -174,6 +177,7 @@ public class TajoWorker extends CompositeService {
this.connPool = RpcConnectionPool.getPool(systemConf);
this.workerContext = new WorkerContext();
+ this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
@@ -192,6 +196,9 @@ public class TajoWorker extends CompositeService {
systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
}
+ this.dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+
// querymaster worker
tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
addService(tajoWorkerClientService);
@@ -200,7 +207,7 @@ public class TajoWorker extends CompositeService {
addService(queryMasterManagerService);
// taskrunner worker
- taskRunnerManager = new TaskRunnerManager(workerContext);
+ taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
addService(taskRunnerManager);
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
@@ -240,30 +247,19 @@ public class TajoWorker extends CompositeService {
",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
- super.init(conf);
+ super.serviceInit(conf);
tajoMasterInfo = new TajoMasterInfo();
- if(yarnContainerMode && queryMasterMode) {
- tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(cmdArgs[2]));
- connectToCatalog();
-
- QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
- queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
- queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
- } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
- taskRunnerManager.startTask(cmdArgs);
+ if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
} else {
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
- } else {
- tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
- tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS)));
- }
- connectToCatalog();
+ tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .RESOURCE_TRACKER_RPC_ADDRESS)));
}
+ connectToCatalog();
workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
workerHeartbeatThread.init(conf);
@@ -309,13 +305,13 @@ public class TajoWorker extends CompositeService {
}
@Override
- public void start() {
- super.start();
+ public void serviceStart() throws Exception {
initWorkerMetrics();
+ super.serviceStart();
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
if(stopped.getAndSet(true)) {
return;
}
@@ -349,14 +345,11 @@ public class TajoWorker extends CompositeService {
}
if(deletionService != null) deletionService.stop();
- super.stop();
+ super.serviceStop();
LOG.info("TajoWorker main thread exiting");
}
public class WorkerContext {
- private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap =
- new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>();
-
public QueryMaster getQueryMaster() {
if (queryMasterManagerService == null) {
return null;
@@ -407,23 +400,8 @@ public class TajoWorker extends CompositeService {
}
}
- public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson)
- throws InterruptedException {
-
- if (!sharedResourceMap.containsKey(blockId)) {
- ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
- if (sharedResourceMap.putIfAbsent(blockId, resource) == null) {
- resource.initialize(queryContext, planJson);
- }
- }
- }
-
- public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) {
- return sharedResourceMap.get(blockId);
- }
-
- public void releaseSharedResource(ExecutionBlockId blockId) {
- sharedResourceMap.remove(blockId).release();
+ public LocalDirAllocator getLocalDirAllocator(){
+ return lDirAllocator;
}
protected void cleanup(String strPath) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index fa116c3..472ce1b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,14 +25,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.apache.tajo.worker.event.TaskRunnerStopEvent;
import java.net.InetSocketAddress;
@@ -112,19 +116,16 @@ public class TajoWorkerManagerService extends CompositeService
}
@Override
- public void executeExecutionBlock(RpcController controller,
+ public void startExecutionBlock(RpcController controller,
TajoWorkerProtocol.RunExecutionBlockRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
try {
- workerContext.initSharedResource(
- new QueryContext(workerContext.getConf(), request.getQueryContext()),
- TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson());
String[] params = new String[7];
params[0] = "standby"; //mode(never used)
- params[1] = request.getExecutionBlockId();
+ params[1] = request.getExecutionBlockId().toString();
// NodeId has a form of hostname:port.
params[2] = request.getNodeId();
params[3] = request.getContainerId();
@@ -133,7 +134,14 @@ public class TajoWorkerManagerService extends CompositeService
params[4] = request.getQueryMasterHost();
params[5] = String.valueOf(request.getQueryMasterPort());
params[6] = request.getQueryOutputPath();
- workerContext.getTaskRunnerManager().startTask(params);
+
+ ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
+ workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
+ params
+ , executionBlockId,
+ new QueryContext(workerContext.getConf(), request.getQueryContext()),
+ request.getPlanJson()
+ ));
done.run(TajoWorker.TRUE_PROTO);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
@@ -142,6 +150,21 @@ public class TajoWorkerManagerService extends CompositeService
}
@Override
+ public void stopExecutionBlock(RpcController controller,
+ TajoIdProtos.ExecutionBlockIdProto requestProto,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStopEvent(
+ new ExecutionBlockId(requestProto)
+ ));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request));
@@ -162,13 +185,10 @@ public class TajoWorkerManagerService extends CompositeService
TajoWorkerProtocol.ExecutionBlockListProto ebIds,
RpcCallback<PrimitiveProtos.BoolProto> done) {
for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) {
- String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+ String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
workerContext.cleanup(inputDir);
- String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+ String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
workerContext.cleanup(outputDir);
-
- // Release shared resources
- workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto));
}
done.run(TajoWorker.TRUE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index d0665ae..7b4cbe1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -25,8 +25,10 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos;
@@ -43,11 +45,10 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.QueryUnitRequest;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -59,7 +60,7 @@ import java.net.URI;
import java.text.NumberFormat;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutorService;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -69,11 +70,9 @@ public class Task {
private final TajoConf systemConf;
private final QueryContext queryContext;
- private final FileSystem localFS;
- private TaskRunner.TaskRunnerContext taskRunnerContext;
- private final QueryMasterProtocolService.Interface masterProxy;
- private final LocalDirAllocator lDirAllocator;
+ private final ExecutionBlockContext executionBlockContext;
private final QueryUnitAttemptId taskId;
+ private final String taskRunnerId;
private final Path taskDir;
private final QueryUnitRequest request;
@@ -85,7 +84,6 @@ public class Task {
private boolean interQuery;
private boolean killed = false;
private boolean aborted = false;
- private final Reporter reporter;
private Path inputTableBaseDir;
private long startTime;
@@ -97,7 +95,6 @@ public class Task {
private ShuffleType shuffleType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
- private ClientSocketChannelFactory channelFactory = null;
static final String OUTPUT_FILE_PREFIX="part-";
static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -132,45 +129,27 @@ public class Task {
}
};
- public Task(QueryUnitAttemptId taskId,
- final TaskRunner.TaskRunnerContext worker,
- final QueryMasterProtocolService.Interface masterProxy,
+ public Task(String taskRunnerId,
+ Path baseDir,
+ QueryUnitAttemptId taskId,
+ final ExecutionBlockContext executionBlockContext,
final QueryUnitRequest request) throws IOException {
+ this.taskRunnerId = taskRunnerId;
this.request = request;
this.taskId = taskId;
- this.systemConf = worker.getConf();
+ this.systemConf = executionBlockContext.getConf();
this.queryContext = request.getQueryContext();
- this.taskRunnerContext = worker;
- this.masterProxy = masterProxy;
- this.localFS = worker.getLocalFS();
- this.lDirAllocator = worker.getLocalDirAllocator();
- this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
+ this.executionBlockContext = executionBlockContext;
+ this.taskDir = StorageUtil.concatPath(baseDir,
taskId.getQueryUnitId().getId() + "_" + taskId.getId());
- this.context = new TaskAttemptContext(queryContext, worker.getWorkerContext(), taskId,
+ this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
this.inputStats = new TableStats();
- this.reporter = new Reporter(taskId, masterProxy);
- this.reporter.startCommunicationThread();
-
-
- // resource intiailization
- boolean resourceInitialized = false;
- try {
- resourceInitialized = context.getSharedResource().awaitInitializedResource();
- } catch (InterruptedException e) {
- LOG.error("Failed Resource Initialization", e);
- } finally {
- if (!resourceInitialized) {
- setState(TaskAttemptState.TA_FAILED);
- return;
- }
- }
-
plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
if (scanNode != null) {
@@ -234,11 +213,12 @@ public class Task {
public void init() throws IOException {
if (context.getState() == TaskAttemptState.TA_PENDING) {
// initialize a task temporal dir
+ FileSystem localFS = executionBlockContext.getLocalFS();
localFS.mkdirs(taskDir);
if (request.getFetches().size() > 0) {
inputTableBaseDir = localFS.makeQualified(
- lDirAllocator.getLocalPathForWrite(
+ executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
localFS.mkdirs(inputTableBaseDir);
Path tableDir;
@@ -296,8 +276,9 @@ public class Task {
}
public void fetch() {
+ ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
for (Fetcher f : fetcherRunners) {
- taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+ executorService.submit(new FetchRunner(context, f));
}
}
@@ -305,25 +286,18 @@ public class Task {
killed = true;
context.stop();
context.setState(TaskAttemptState.TA_KILLED);
- releaseChannelFactory();
}
public void abort() {
aborted = true;
context.stop();
- releaseChannelFactory();
}
public void cleanUp() {
// remove itself from worker
if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
- try {
- localFS.delete(context.getWorkDir(), true);
- synchronized (taskRunnerContext.getTasks()) {
- taskRunnerContext.getTasks().remove(this.getId());
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ synchronized (executionBlockContext.getTasks()) {
+ executionBlockContext.getTasks().remove(this.getId());
}
} else {
LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
@@ -332,7 +306,7 @@ public class Task {
public TaskStatusProto getReport() {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
- builder.setWorkerName(taskRunnerContext.getNodeId());
+ builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString());
builder.setId(context.getTaskId().getProto())
.setProgress(context.getProgress())
.setState(context.getState());
@@ -345,6 +319,23 @@ public class Task {
return builder.build();
}
+ public boolean isRunning(){
+ return context.getState() == TaskAttemptState.TA_RUNNING;
+ }
+ public boolean isProgressChanged() {
+ return context.isProgressChanged();
+ }
+
+ public void updateProgress() {
+ if(killed || aborted){
+ return;
+ }
+
+ if (executor != null && context.getProgress() < 1.0f) {
+ context.setExecutorProgress(executor.getProgress());
+ }
+ }
+
private CatalogProtos.TableStatsProto reloadInputStats() {
synchronized(inputStats) {
if (this.executor == null) {
@@ -419,12 +410,11 @@ public class Task {
FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
context.updateAssignedFragments(inputTable, frags);
}
- releaseChannelFactory();
}
- public void run() {
+ public void run() throws Exception {
startTime = System.currentTimeMillis();
- Exception error = null;
+ Throwable error = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
@@ -433,16 +423,17 @@ public class Task {
// complete.
waitForFetch();
context.setFetcherProgress(FETCHER_PROGRESS);
- context.setProgress(FETCHER_PROGRESS);
+ context.setProgressChanged(true);
+ updateProgress();
}
- this.executor = taskRunnerContext.getTQueryEngine().
+ this.executor = executionBlockContext.getTQueryEngine().
createPlan(context, plan);
this.executor.init();
- while(!killed && executor.next() != null) {
+ while(!killed && !aborted && executor.next() != null) {
}
- } catch (Exception e) {
+ } catch (Throwable e) {
error = e ;
LOG.error(e.getMessage(), e);
aborted = true;
@@ -452,22 +443,20 @@ public class Task {
executor.close();
reloadInputStats();
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e);
}
this.executor = null;
}
- context.setProgress(1.0f);
- taskRunnerContext.completedTasksNum.incrementAndGet();
+ executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
-
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
if (killed || aborted) {
context.setExecutorProgress(0.0f);
- context.setProgress(0.0f);
if(killed) {
context.setState(TaskAttemptState.TA_KILLED);
- masterProxy.statusUpdate(null, getReport(), NullCallback.get());
- taskRunnerContext.killedTasksNum.incrementAndGet();
+ queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+ executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
@@ -482,47 +471,31 @@ public class Task {
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
- masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
- taskRunnerContext.failedTasksNum.incrementAndGet();
- }
-
- // stopping the status report
- try {
- reporter.stopCommunicationThread();
- } catch (InterruptedException e) {
- LOG.warn(e);
+ queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+ executionBlockContext.failedTasksNum.incrementAndGet();
}
-
} else {
// if successful
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
-
- // stopping the status report
- try {
- reporter.stopCommunicationThread();
- } catch (InterruptedException e) {
- LOG.warn(e);
- }
+ executionBlockContext.succeededTasksNum.incrementAndGet();
TaskCompletionReport report = getTaskCompletionReport();
- masterProxy.done(null, report, NullCallback.get());
- taskRunnerContext.succeededTasksNum.incrementAndGet();
+ queryMasterStub.done(null, report, NullCallback.get());
}
finishTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
- "Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
- ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue()
- + ", killed: " + taskRunnerContext.killedTasksNum.intValue()
- + ", failed: " + taskRunnerContext.failedTasksNum.intValue());
+ "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+ ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ + ", failed: " + executionBlockContext.failedTasksNum.intValue());
cleanupTask();
}
}
public void cleanupTask() {
- taskRunnerContext.addTaskHistory(getId(), createTaskHistory());
- taskRunnerContext.getTasks().remove(getId());
- taskRunnerContext = null;
+ executionBlockContext.addTaskHistory(taskRunnerId, getId(), createTaskHistory());
+ executionBlockContext.getTasks().remove(getId());
fetcherRunners.clear();
fetcherRunners = null;
@@ -532,11 +505,8 @@ public class Task {
executor = null;
}
} catch (IOException e) {
- e.printStackTrace();
+ LOG.fatal(e.getMessage(), e);
}
- plan = null;
- context = null;
- releaseChannelFactory();
}
public TaskHistory createTaskHistory() {
@@ -577,7 +547,7 @@ public class Task {
taskHistory.setFinishedFetchCount(i);
}
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn(e.getMessage(), e);
}
return taskHistory;
@@ -673,7 +643,11 @@ public class Task {
@VisibleForTesting
public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
- return ((float)(totalFetcher - remainFetcher)) / (float)totalFetcher * FETCHER_PROGRESS;
+ if (totalFetcher > 0) {
+ return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+ } else {
+ return 0.0f;
+ }
}
private synchronized void fetcherFinished(TaskAttemptContext ctx) {
@@ -681,24 +655,15 @@ public class Task {
if(fetcherSize == 0) {
return;
}
- try {
- int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
- if (numRunningFetcher == 0) {
- context.setProgress(FETCHER_PROGRESS);
- } else {
- context.setProgress(adjustFetchProcess(fetcherSize, numRunningFetcher));
- }
- } finally {
- ctx.getFetchLatch().countDown();
- }
- }
+ ctx.getFetchLatch().countDown();
- private void releaseChannelFactory(){
- if(channelFactory != null) {
- channelFactory.shutdown();
- channelFactory.releaseExternalResources();
- channelFactory = null;
+ int remainFetcher = (int) ctx.getFetchLatch().getCount();
+ if (remainFetcher == 0) {
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ } else {
+ context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+ context.setProgressChanged(true);
}
}
@@ -706,15 +671,9 @@ public class Task {
List<FetchImpl> fetches) throws IOException {
if (fetches.size() > 0) {
-
- releaseChannelFactory();
-
-
- int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
- channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
- Path inputDir = lDirAllocator.
- getLocalPathToRead(
- getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+ ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
+ Path inputDir = executionBlockContext.getLocalDirAllocator().
+ getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
File storeDir;
int i = 0;
@@ -739,91 +698,9 @@ public class Task {
}
}
- protected class Reporter {
- private QueryMasterProtocolService.Interface masterStub;
- private Thread pingThread;
- private AtomicBoolean stop = new AtomicBoolean(false);
- private static final int PROGRESS_INTERVAL = 3000;
- private static final int MAX_RETRIES = 3;
- private QueryUnitAttemptId taskId;
-
- public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
- this.taskId = taskId;
- this.masterStub = masterStub;
- }
-
- Runnable createReporterThread() {
-
- return new Runnable() {
- int remainingRetries = MAX_RETRIES;
- @Override
- public void run() {
- while (!stop.get() && !context.isStopped()) {
- try {
- if(executor != null && context.getProgress() < 1.0f) {
- float progress = executor.getProgress();
- context.setExecutorProgress(progress);
- }
- } catch (Throwable t) {
- LOG.error("Get progress error: " + t.getMessage(), t);
- }
-
- try {
- if (context.isPorgressChanged()) {
- masterStub.statusUpdate(null, getReport(), NullCallback.get());
- } else {
- masterStub.ping(null, taskId.getProto(), NullCallback.get());
- }
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- remainingRetries -=1;
- if (remainingRetries == 0) {
- ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
- LOG.warn("Last retry, exiting ");
- throw new RuntimeException(t);
- }
- } finally {
- if (!context.isStopped() && remainingRetries > 0) {
- synchronized (pingThread) {
- try {
- pingThread.wait(PROGRESS_INTERVAL);
- } catch (InterruptedException e) {
- }
- }
- }
- }
- }
- }
- };
- }
-
- public void startCommunicationThread() {
- if (pingThread == null) {
- pingThread = new Thread(createReporterThread());
- pingThread.setName("communication thread");
- pingThread.start();
- }
- }
-
- public void stopCommunicationThread() throws InterruptedException {
- if(stop.getAndSet(true)){
- return;
- }
-
- if (pingThread != null) {
- // Intent of the lock is to not send an interupt in the middle of an
- // umbilical.ping or umbilical.statusUpdate
- synchronized(pingThread) {
- //Interrupt if sleeping. Otherwise wait for the RPC call to return.
- pingThread.notifyAll();
- }
- }
- }
- }
-
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
- StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
+ StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
String.valueOf(quid.getQueryUnitId().getId()),
String.valueOf(quid.getId()));
return workDir;
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index d27fd6d..422ec2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -83,15 +83,15 @@ public class TaskAttemptContext {
private Map<Integer, Long> partitionOutputVolume;
private HashShuffleAppenderManager hashShuffleAppenderManager;
- public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext,
+ public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
final QueryUnitAttemptId queryId,
final FragmentProto[] fragments,
final Path workDir) {
this.queryContext = queryContext;
- if (workerContext != null) { // For unit tests
- this.workerContext = workerContext;
- this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId());
+ if (executionBlockContext != null) { // For unit tests
+ this.workerContext = executionBlockContext.getWorkerContext();
+ this.sharedResource = executionBlockContext.getSharedResource();
}
this.queryId = queryId;
@@ -315,22 +315,45 @@ public class TaskAttemptContext {
public float getProgress() {
return this.progress;
}
-
+
public void setProgress(float progress) {
float previousProgress = this.progress;
- this.progress = progress;
- progressChanged.set(previousProgress != progress);
+
+ if (Float.isNaN(progress) || Float.isInfinite(progress)) {
+ this.progress = 0.0f;
+ } else {
+ this.progress = progress;
+ }
+
+ if (previousProgress != progress) {
+ setProgressChanged(true);
+ }
}
- public boolean isPorgressChanged() {
+ public boolean isProgressChanged() {
return progressChanged.get();
}
+
+ public void setProgressChanged(boolean changed){
+ progressChanged.set(changed);
+ }
+
public void setExecutorProgress(float executorProgress) {
- float adjustProgress = executorProgress * (1 - fetcherProgress);
- setProgress(fetcherProgress + adjustProgress);
+ if(Float.isNaN(executorProgress) || Float.isInfinite(executorProgress)){
+ executorProgress = 0.0f;
+ }
+
+ if (hasFetchPhase()) {
+ setProgress(fetcherProgress + (executorProgress * 0.5f));
+ } else {
+ setProgress(executorProgress);
+ }
}
public void setFetcherProgress(float fetcherProgress) {
+ if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){
+ fetcherProgress = 0.0f;
+ }
this.fetcherProgress = fetcherProgress;
}
@@ -375,10 +398,6 @@ public class TaskAttemptContext {
return queryContext;
}
- public WorkerContext getWorkContext() {
- return workerContext;
- }
-
public QueryUnitAttemptId getQueryId() {
return queryId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 32cd4f5..ea8ed82 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -18,42 +18,26 @@
package org.apache.tajo.worker;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.worker.TajoWorker.WorkerContext;
-import java.net.InetSocketAddress;
-import java.util.Map;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -67,142 +51,78 @@ public class TaskRunner extends AbstractService {
private TajoConf systemConf;
private volatile boolean stopped = false;
+ private Path baseDirPath;
- private ExecutionBlockId executionBlockId;
- private QueryId queryId;
private NodeId nodeId;
private ContainerId containerId;
- // Cluster Management
- //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
-
- // for temporal or intermediate files
- private FileSystem localFS;
- // for input files
- private FileSystem defaultFS;
-
- private TajoQueryEngine queryEngine;
-
// for Fetcher
private ExecutorService fetchLauncher;
- // It keeps all of the query unit attempts while a TaskRunner is running.
- private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
-
- private LocalDirAllocator lDirAllocator;
// A thread to receive each assigned query unit and execute the query unit
private Thread taskLauncher;
// Contains the object references related for TaskRunner
- private TaskRunnerContext taskRunnerContext;
- // for the doAs block
- private UserGroupInformation taskOwner;
-
- // for the local temporal dir
- private String baseDir;
- private Path baseDirPath;
-
- private TaskRunnerManager taskRunnerManager;
+ private ExecutionBlockContext executionBlockContext;
private long finishTime;
- private RpcConnectionPool connPool;
-
- private InetSocketAddress qmMasterAddr;
-
private TaskRunnerHistory history;
- public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
+ public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) {
super(TaskRunner.class.getName());
- this.taskRunnerManager = taskRunnerManager;
- this.connPool = RpcConnectionPool.getPool(conf);
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory fetcherFactory = builder.setNameFormat("Fetcher executor #%d").build();
+ this.systemConf = executionBlockContext.getConf();
this.fetchLauncher = Executors.newFixedThreadPool(
- conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
+ systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
try {
- final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
-
- LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
- LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR));
-
- UserGroupInformation.setConfiguration(conf);
-
// QueryBlockId from String
// NodeId has a form of hostname:port.
- NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+ this.nodeId = ConverterUtils.toNodeId(args[2]);
this.containerId = ConverterUtils.toContainerId(args[3]);
// QueryMaster's address
- String host = args[4];
- int port = Integer.parseInt(args[5]);
- this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port);
-
- LOG.info("QueryMaster Address:" + qmMasterAddr);
- // TODO - 'load credential' should be implemented
- // Getting taskOwner
- UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
- //taskOwner.addToken(token);
-
- // initialize MasterWorkerProtocol as an actual task owner.
-// this.client =
-// taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
-// @Override
-// public AsyncRpcClient run() throws Exception {
-// return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
-// }
-// });
-// this.master = client.getStub();
-
- this.executionBlockId = executionBlockId;
- this.queryId = executionBlockId.getQueryId();
- this.nodeId = nodeId;
- this.taskOwner = taskOwner;
-
- this.taskRunnerContext = new TaskRunnerContext();
- this.history = new TaskRunnerHistory(containerId, executionBlockId);
+ //String host = args[4];
+ //int port = Integer.parseInt(args[5]);
+
+ this.executionBlockContext = executionBlockContext;
+ this.history = executionBlockContext.createTaskRunnerHistory(this);
this.history.setState(getServiceState());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
- protected void sendExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport reporter) throws Exception {
- QueryMasterProtocol.QueryMasterProtocolService.Interface qmClientService = null;
- NettyClientBase qmClient = null;
- try {
- qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
- qmClientService = qmClient.getStub();
- qmClientService.doneExecutionBlock(null, reporter, NullCallback.get());
- } finally {
- connPool.releaseConnection(qmClient);
- }
+ // TODO this is expensive. we should change to unique id
+ public String getId() {
+ return getId(getContext().getExecutionBlockId(), containerId);
}
- public String getId() {
- return getId(executionBlockId, containerId);
+ public NodeId getNodeId(){
+ return nodeId;
+ }
+
+ public ContainerId getContainerId(){
+ return containerId;
}
public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
return executionBlockId + "," + containerId;
}
- public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){
- Path workDir =
- StorageUtil.concatPath(
- executionBlockId.getQueryId().toString(),
- "output",
- String.valueOf(executionBlockId.getId()));
- return workDir;
+ public TaskRunnerHistory getHistory(){
+ return history;
}
- public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
- Path workDir =
- StorageUtil.concatPath(
- executionBlockId.getQueryId().toString(),
- "in",
- executionBlockId.toString());
- return workDir;
+ public Path getTaskBaseDir(){
+ return baseDirPath;
+ }
+
+ public ExecutorService getFetchLauncher() {
+ return fetchLauncher;
}
@Override
@@ -210,27 +130,13 @@ public class TaskRunner extends AbstractService {
this.systemConf = (TajoConf)conf;
try {
- // initialize DFS and LocalFileSystems
- defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
- localFS = FileSystem.getLocal(conf);
-
// the base dir for an output dir
- baseDir = getBaseOutputDir(executionBlockId).toString();
-
- // initialize LocalDirAllocator
- lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
- baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
- LOG.info("TaskRunner basedir is created (" + baseDir +")");
-
- // Setup QueryEngine according to the query plan
- // Here, we can setup row-based query engine or columnar query engine.
- this.queryEngine = new TajoQueryEngine(systemConf);
+ baseDirPath = getContext().createBaseDir();
+ LOG.info("TaskRunner basedir is created (" + baseDirPath +")");
} catch (Throwable t) {
t.printStackTrace();
LOG.error(t);
}
-
super.init(conf);
this.history.setState(getServiceState());
}
@@ -253,23 +159,10 @@ public class TaskRunner extends AbstractService {
// If this flag become true, taskLauncher will be terminated.
this.stopped = true;
- // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
- for (Task task : tasks.values()) {
- if (task.getStatus() == TaskAttemptState.TA_PENDING ||
- task.getStatus() == TaskAttemptState.TA_RUNNING) {
- task.setState(TaskAttemptState.TA_FAILED);
- task.abort();
- }
- }
-
- tasks.clear();
fetchLauncher.shutdown();
fetchLauncher = null;
- this.queryEngine = null;
-
- TupleCache.getInstance().removeBroadcastCache(executionBlockId);
- LOG.info("Stop TaskRunner: " + executionBlockId);
+ LOG.info("Stop TaskRunner: " + getId());
synchronized (this) {
notifyAll();
}
@@ -281,71 +174,8 @@ public class TaskRunner extends AbstractService {
return finishTime;
}
- public class TaskRunnerContext {
- public AtomicInteger completedTasksNum = new AtomicInteger();
- public AtomicInteger succeededTasksNum = new AtomicInteger();
- public AtomicInteger killedTasksNum = new AtomicInteger();
- public AtomicInteger failedTasksNum = new AtomicInteger();
-
- public TajoConf getConf() {
- return systemConf;
- }
-
- public String getNodeId() {
- return nodeId.toString();
- }
-
- public FileSystem getLocalFS() {
- return localFS;
- }
-
- public FileSystem getDefaultFS() {
- return defaultFS;
- }
-
- public LocalDirAllocator getLocalDirAllocator() {
- return lDirAllocator;
- }
-
- public TajoQueryEngine getTQueryEngine() {
- return queryEngine;
- }
-
- public Map<QueryUnitAttemptId, Task> getTasks() {
- return tasks;
- }
-
- public Task getTask(QueryUnitAttemptId taskId) {
- return tasks.get(taskId);
- }
-
- public ExecutorService getFetchLauncher() {
- return fetchLauncher;
- }
-
- public Path getBaseDir() {
- return baseDirPath;
- }
-
- public ExecutionBlockId getExecutionBlockId() {
- return executionBlockId;
- }
-
- public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
- history.addTaskHistory(quAttemptId, taskHistory);
- }
-
- public TaskRunnerHistory getExcutionBlockHistory(){
- return history;
- }
-
- public WorkerContext getWorkerContext() {
- return taskRunnerManager.getWorkerContext();
- }
- }
-
- public TaskRunnerContext getContext() {
- return taskRunnerContext;
+ public ExecutionBlockContext getContext() {
+ return executionBlockContext;
}
static void fatalError(QueryMasterProtocolService.Interface qmClientService,
@@ -372,17 +202,15 @@ public class TaskRunner extends AbstractService {
QueryUnitRequestProto taskRequest = null;
while(!stopped) {
- NettyClientBase qmClient = null;
- QueryMasterProtocolService.Interface qmClientService = null;
+ QueryMasterProtocolService.Interface qmClientService;
try {
- qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
- qmClientService = qmClient.getStub();
+ qmClientService = getContext().getQueryMasterStub();
if (callFuture == null) {
callFuture = new CallFuture<QueryUnitRequestProto>();
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
- .setExecutionBlockId(executionBlockId.getProto())
+ .setExecutionBlockId(getExecutionBlockId().getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
.build();
@@ -414,17 +242,14 @@ public class TaskRunner extends AbstractService {
if (taskRequest.getShouldDie()) {
LOG.info("Received ShouldDie flag:" + getId());
stop();
- if(taskRunnerManager != null) {
- //notify to TaskRunnerManager
- taskRunnerManager.stopTask(getId());
- taskRunnerManager= null;
- }
+ //notify to TaskRunnerManager
+ getContext().stopTaskRunner(getId());
} else {
- taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+ getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
- if (tasks.containsKey(taskAttemptId)) {
+ if (getContext().getTasks().containsKey(taskAttemptId)) {
LOG.error("Duplicate Task Attempt: " + taskAttemptId);
fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
continue;
@@ -433,9 +258,9 @@ public class TaskRunner extends AbstractService {
LOG.info("Initializing: " + taskAttemptId);
Task task;
try {
- task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
+ task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
new QueryUnitRequestImpl(taskRequest));
- tasks.put(taskAttemptId, task);
+ getContext().getTasks().put(taskAttemptId, task);
task.init();
if (task.hasFetchPhase()) {
@@ -453,9 +278,7 @@ public class TaskRunner extends AbstractService {
}
}
} catch (Throwable t) {
- t.printStackTrace();
- } finally {
- connPool.releaseConnection(qmClient);
+ LOG.fatal(t.getMessage(), t);
}
}
}
@@ -463,12 +286,6 @@ public class TaskRunner extends AbstractService {
taskLauncher.start();
} catch (Throwable t) {
LOG.fatal("Unhandled exception. Starting shutdown.", t);
- } finally {
- for (Task t : tasks.values()) {
- if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
- t.abort();
- }
- }
}
}
@@ -480,6 +297,6 @@ public class TaskRunner extends AbstractService {
}
public ExecutionBlockId getExecutionBlockId() {
- return this.executionBlockId;
+ return getContext().getExecutionBlockId();
}
}