You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/10/25 07:22:41 UTC
[2/2] git commit: TAJO-275: Separating QueryMaster and TaskRunner
roles in worker. (Keuntae Park via jihoon)
TAJO-275: Separating QueryMaster and TaskRunner roles in worker. (Keuntae Park via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5470bcea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5470bcea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5470bcea
Branch: refs/heads/master
Commit: 5470bcea1ae3ec4c627f7f6fc7efa2c1c19e06cd
Parents: fdf0979
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Oct 25 14:21:18 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Oct 25 14:21:18 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
pom.xml | 1 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
tajo-core/tajo-core-backend/pom.xml | 1 +
.../org/apache/tajo/master/GlobalEngine.java | 19 +-
.../apache/tajo/master/TajoContainerProxy.java | 6 +-
.../apache/tajo/master/TajoMasterService.java | 10 +-
.../master/querymaster/QueryInProgress.java | 20 +-
.../tajo/master/querymaster/QueryInfo.java | 2 +-
.../master/querymaster/QueryJobManager.java | 7 +-
.../tajo/master/querymaster/QueryMaster.java | 14 +-
.../querymaster/QueryMasterManagerService.java | 208 +++++++++++++++++
.../master/rm/TajoWorkerResourceManager.java | 115 +++++----
.../apache/tajo/master/rm/WorkerResource.java | 66 ++++--
.../tajo/master/rm/WorkerResourceManager.java | 3 +
.../tajo/master/rm/YarnTajoResourceManager.java | 4 +
.../tajo/worker/TajoResourceAllocator.java | 6 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 233 ++++++++++++++-----
.../tajo/worker/TajoWorkerManagerService.java | 96 --------
.../main/java/org/apache/tajo/worker/Task.java | 14 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 12 +-
.../apache/tajo/worker/TaskRunnerManager.java | 4 +-
.../src/main/proto/QueryMasterProtocol.proto | 41 ++++
.../src/main/proto/TajoMasterProtocol.proto | 39 ++--
.../src/main/proto/TajoWorkerProtocol.proto | 8 -
.../src/main/resources/tajo-default.xml | 13 ++
.../main/resources/webapps/admin/cluster.jsp | 112 +++++++--
.../src/main/resources/webapps/admin/index.jsp | 87 +++++--
.../src/main/resources/webapps/admin/query.jsp | 29 ++-
.../src/main/resources/webapps/worker/env.jsp | 2 +-
.../src/main/resources/webapps/worker/index.jsp | 71 +++---
.../resources/webapps/worker/querydetail.jsp | 2 +-
.../main/resources/webapps/worker/queryplan.jsp | 2 +-
.../resources/webapps/worker/querytasks.jsp | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 4 +-
tajo-dist/src/main/bin/start-tajo.sh | 3 +-
tajo-dist/src/main/bin/stop-tajo.sh | 3 +-
tajo-dist/src/main/bin/tajo | 19 +-
tajo-dist/src/main/bin/tajo-config.sh | 4 +-
tajo-dist/src/main/conf/tajo-env.sh | 6 +
40 files changed, 905 insertions(+), 387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5b6a248..7defd7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-275: Separating QueryMaster and TaskRunner roles in worker. (Keuntae Park via jihoon)
+
TAJO-270: Boolean datum compatible to apache hive. (jinho)
TAJO-261: Rearrange default port numbers and config names. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1815958..7038e40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -306,6 +306,7 @@
<excludes>
<exclude>CHANGES.txt</exclude>
<exclude>**/workers</exclude>
+ <exclude>**/querymasters</exclude>
<exclude>**/*.sql</exclude>
<exclude>**/*.hiveql</exclude>
<exclude>**/*.schema</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 475877b..360d7f0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -83,8 +83,10 @@ public class TajoConf extends YarnConfiguration {
// Tajo Worker Service Addresses
WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
+ WORKER_QM_INFO_ADDRESS("tajo.worker.qm-info-http.address", "0.0.0.0:28081"),
WORKER_PEER_RPC_ADDRESS("tajo.worker.peer-rpc.address", "0.0.0.0:28091"),
WORKER_CLIENT_RPC_ADDRESS("tajo.worker.client-rpc.address", "0.0.0.0:28092"),
+ WORKER_QM_RPC_ADDRESS("tajo.worker.qm-rpc.address", "0.0.0.0:28093"),
// Tajo Worker Temporal Directories
WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index f888d1e..f674f84 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -129,6 +129,7 @@
<argument>--java_out=target/generated-sources/proto</argument>
<argument>src/main/proto/tajo_protos.proto</argument>
<argument>src/main/proto/ClientProtos.proto</argument>
+ <argument>src/main/proto/QueryMasterProtocol.proto</argument>
<argument>src/main/proto/QueryMasterClientProtocol.proto</argument>
<argument>src/main/proto/TajoMasterProtocol.proto</argument>
<argument>src/main/proto/TajoWorkerProtocol.proto</argument>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index c3eb5f7..a92b1a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -149,13 +149,20 @@ public class GlobalEngine extends AbstractService {
queryInfo = queryJobManager.createNewQueryJob(queryContext, sql, rootNode);
- responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
- responseBuilder.setState(queryInfo.getQueryState());
- if(queryInfo.getQueryMasterHost() != null) {
- responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ if(queryInfo == null) {
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+ responseBuilder.setState(TajoProtos.QueryState.QUERY_ERROR);
+ responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+ } else {
+ responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setState(queryInfo.getQueryState());
+ if(queryInfo.getQueryMasterHost() != null) {
+ responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ }
+ responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
}
- responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
}
GetQueryStatusResponse response = responseBuilder.build();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 846343d..be9d047 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -65,7 +65,7 @@ public class TajoContainerProxy extends ContainerProxy {
AsyncRpcClient tajoWorkerRpc = null;
try {
InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
- .getTajoWorkerManagerService().getBindAddr();
+ .getQueryMasterManagerService().getBindAddr();
InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
@@ -151,7 +151,9 @@ public class TajoContainerProxy extends ContainerProxy {
for(WorkerResource eahWorkerResource: workerResources) {
workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
- .setWorkerHostAndPort(eahWorkerResource.getId())
+ .setHost(eahWorkerResource.getAllocatedHost())
+ .setQueryMasterPort(eahWorkerResource.getQueryMasterPort())
+ .setPeerRpcPort(eahWorkerResource.getPeerRpcPort())
.setExecutionBlockId(executionBlockId.getProto())
.setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
.setDiskSlots(eahWorkerResource.getDiskSlots())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 04b562c..cf193de 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -93,7 +93,8 @@ public class TajoMasterService extends AbstractService {
RpcController controller,
TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort());
+ LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" +
+ request.getTajoQueryMasterPort() + ":" + request.getPeerRpcPort());
}
TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
@@ -137,9 +138,10 @@ public class TajoMasterService extends AbstractService {
List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
WorkerResource workerResource = new WorkerResource();
- String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
- workerResource.setAllocatedHost(tokens[0]);
- workerResource.setPeerRpcPort(Integer.parseInt(tokens[1]));
+ workerResource.setAllocatedHost(eachWorkerResource.getHost());
+
+ workerResource.setPeerRpcPort(eachWorkerResource.getPeerRpcPort());
+ workerResource.setQueryMasterPort(eachWorkerResource.getQueryMasterPort());
workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 0098a7c..0eb9120 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
@@ -62,7 +64,7 @@ public class QueryInProgress extends CompositeService {
private AsyncRpcClient queryMasterRpc;
- private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
+ private QueryMasterProtocolService queryMasterRpcClient;
public QueryInProgress(
TajoMaster.MasterContext masterContext,
@@ -139,18 +141,22 @@ public class QueryInProgress extends CompositeService {
return dispatcher.getEventHandler();
}
- public void startQueryMaster() {
+ public boolean startQueryMaster() {
try {
LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
WorkerResourceManager resourceManager = masterContext.getResourceManager();
WorkerResource queryMasterResource = resourceManager.allocateQueryMaster(this);
- if(queryMasterResource != null) {
- queryInfo.setQueryMasterResource(queryMasterResource);
+ if(queryMasterResource == null) {
+ return false;
}
+ queryInfo.setQueryMasterResource(queryMasterResource);
getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+ return true;
} catch (Exception e) {
catchException(e);
+ return false;
}
}
@@ -169,7 +175,7 @@ public class QueryInProgress extends CompositeService {
}
}
- public TajoWorkerProtocol.TajoWorkerProtocolService getQueryMasterRpcClient() {
+ public QueryMasterProtocolService getQueryMasterRpcClient() {
return queryMasterRpcClient;
}
@@ -180,7 +186,7 @@ public class QueryInProgress extends CompositeService {
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
//TODO Get Connection from pool
- queryMasterRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
+ queryMasterRpc = new AsyncRpcClient(QueryMasterProtocol.class, addr);
queryMasterRpcClient = queryMasterRpc.getStub();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index f89a017..058352f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,7 +66,7 @@ public class QueryInfo {
if(queryMasterResource == null) {
return 0;
}
- return queryMasterResource.getPeerRpcPort();
+ return queryMasterResource.getQueryMasterPort();
}
public int getQueryMasterClientPort() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index f5f029f..41df7c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -107,7 +107,9 @@ public class QueryJobManager extends CompositeService {
queryInProgress.init(getConfig());
queryInProgress.start();
- queryInProgress.startQueryMaster();
+ if(!queryInProgress.startQueryMaster()) {
+ return null;
+ }
return queryInProgress.getQueryInfo();
}
@@ -171,7 +173,8 @@ public class QueryJobManager extends CompositeService {
if(queryHeartbeat.getTajoWorkerHost() != null) {
WorkerResource queryMasterResource = new WorkerResource();
queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
- queryMasterResource.setPeerRpcPort(queryHeartbeat.getTajoWorkerPort());
+ queryMasterResource.setPeerRpcPort(queryHeartbeat.getPeerRpcPort());
+ queryMasterResource.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
queryInfo.setQueryMasterResource(queryMasterResource);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 2860b17..1198f3e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -153,7 +153,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
super.stop();
LOG.info("QueryMaster stop");
- if(!queryMasterContext.getWorkerContext().isStandbyMode()) {
+ if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
queryMasterContext.getWorkerContext().stopWorker(true);
}
}
@@ -162,8 +162,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
try {
TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
- .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+ .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+ .setPeerRpcPort(workerContext.getPeerRpcPort())
+ .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
.setState(state)
.setQueryId(queryId.getProto());
@@ -280,7 +281,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
} else {
LOG.warn("No query info:" + queryId);
}
- if(!workerContext.isStandbyMode()) {
+ if(workerContext.isYarnContainerMode()) {
stop();
}
}
@@ -288,8 +289,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
TajoHeartbeat queryHeartbeat = TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
- .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+ .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+ .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
+ .setPeerRpcPort(workerContext.getPeerRpcPort())
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
.setState(queryMasterTask.getState())
.setQueryId(queryMasterTask.getQueryId().getProto())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..4149749
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends CompositeService
+ implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+ private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
+
+ private AsyncRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private int port;
+
+ private QueryMaster queryMaster;
+
+ private TajoWorker.WorkerContext workerContext;
+
+ public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
+ super(QueryMasterManagerService.class.getName());
+ this.workerContext = workerContext;
+ this.port = port;
+ }
+
+ public QueryMaster getQueryMaster() {
+ return queryMaster;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ // Setup RPC server
+ InetSocketAddress initIsa =
+ new InetSocketAddress("0.0.0.0", port);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+ this.rpcServer.start();
+
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+ this.port = bindAddr.getPort();
+
+ queryMaster = new QueryMaster(workerContext);
+ addService(queryMaster);
+
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info("QueryMasterManagerService is bind to " + addr);
+ ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ LOG.info("QueryMasterManagerService stopped");
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddr() {
+ return bindAddr;
+ }
+
+ public String getHostAndPort() {
+ return bindAddr.getHostName() + ":" + bindAddr.getPort();
+ }
+
+ @Override
+ public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
+ RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+ try {
+ ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+ QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+ ContainerId cid =
+ queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
+
+ if(queryMasterTask == null || queryMasterTask.isStopped()) {
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+ done.run(TaskSchedulerImpl.stopTaskRunnerReq);
+ } else {
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId);
+ queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ queryMasterTask.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void ping(RpcController controller,
+ TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+
+ @Override
+ public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void executeQuery(RpcController controller,
+ TajoWorkerProtocol.QueryExecutionRequestProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryId queryId = new QueryId(request.getQueryId());
+ LOG.info("Receive executeQuery request:" + queryId);
+ queryMaster.handle(new QueryStartEvent(queryId,
+ new QueryContext(request.getQueryContext()), request.getSql().getValue(),
+ request.getLogicalPlanJson().getValue()));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c992e43..136f6ea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.worker.TajoWorker;
import java.io.IOException;
import java.util.*;
@@ -42,10 +42,18 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private TajoMaster.MasterContext masterContext;
+ //all workers(include querymaster)
private Map<String, WorkerResource> allWorkerResourceMap = new HashMap<String, WorkerResource>();
- private Set<String> liveWorkerResources = new HashSet<String>();
+
+ //all workers(include querymaster)
private Set<String> deadWorkerResources = new HashSet<String>();
+ //worker only
+ private Set<String> liveWorkerResources = new HashSet<String>();
+
+ //querymaster only
+ private Set<String> liveQueryMasterWorkerResources = new HashSet<String>();
+
private Map<QueryId, WorkerResource> queryMasterMap = new HashMap<QueryId, WorkerResource>();
private final Object workerResourceLock = new Object();
@@ -60,15 +68,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private AtomicBoolean stopped = new AtomicBoolean(false);
- private int queryMasterMemoryMB;
-
- private int queryMasterDiskSlot;
-
public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
this.masterContext = masterContext;
this.queryIdSeed = String.valueOf(System.currentTimeMillis());
- this.queryMasterMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_MEMORY_MB);
- this.queryMasterDiskSlot = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_DISKS);
requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -81,6 +83,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
return Collections.unmodifiableMap(allWorkerResourceMap);
}
+ public Collection<String> getQueryMasters() {
+ return Collections.unmodifiableSet(liveQueryMasterWorkerResources);
+ }
+
public int getNumClusterSlots() {
int numSlots = 0;
synchronized(workerResourceLock) {
@@ -105,18 +111,28 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
@Override
public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
- List<WorkerResource> workerResources = chooseWorkers(true, queryMasterMemoryMB, queryMasterDiskSlot, 1);
- if(workerResources.size() == 0) {
- //TODO if resource available, assign worker.
- LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
- return null;
- }
- WorkerResource queryMasterWorker = workerResources.get(0);
synchronized(workerResourceLock) {
+ if(liveQueryMasterWorkerResources.size() == 0) {
+ LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
+ return null;
+ }
+ WorkerResource queryMasterWorker = null;
+ int minTasks = Integer.MAX_VALUE;
+ for(String eachQueryMaster: liveQueryMasterWorkerResources) {
+ WorkerResource queryMaster = allWorkerResourceMap.get(eachQueryMaster);
+ if(queryMaster != null && queryMaster.getNumQueryMasterTasks() < minTasks) {
+ queryMasterWorker = queryMaster;
+ minTasks = queryMaster.getNumQueryMasterTasks();
+ }
+ }
+ if(queryMasterWorker == null) {
+ return null;
+ }
+ queryMasterWorker.addNumQueryMasterTask();
queryMasterMap.put(queryInProgress.getQueryId(), queryMasterWorker);
+ LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
+ return queryMasterWorker;
}
- LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
- return queryMasterWorker;
}
@Override
@@ -132,7 +148,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
//add queue
TajoMasterProtocol.WorkerResourceAllocationRequest request =
TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMemoryMBSlots(queryMasterMemoryMB)
+ .setMemoryMBSlots(1)
.setDiskSlots(1)
.setExecutionBlockId(QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0).getProto())
.setNumWorks(1)
@@ -207,7 +223,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
", liveWorkers=" + liveWorkerResources.size());
}
- List<WorkerResource> workerResources = chooseWorkers(false,
+ List<WorkerResource> workerResources = chooseWorkers(
resourceRequest.request.getMemoryMBSlots(),
resourceRequest.request.getDiskSlots(),
resourceRequest.request.getNumWorks());
@@ -223,7 +239,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
for(WorkerResource eachWorker: workerResources) {
workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
- .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getPeerRpcPort())
+ .setWorkerHost(eachWorker.getAllocatedHost())
+ .setQueryMasterPort(eachWorker.getQueryMasterPort())
+ .setPeerRpcPort(eachWorker.getPeerRpcPort())
.setWorkerPullServerPort(eachWorker.getPullServerPort())
.build());
}
@@ -252,8 +270,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
- private List<WorkerResource> chooseWorkers(boolean queryMaster,
- int requiredMemoryMBSlots, int requiredDiskSlots,
+ private List<WorkerResource> chooseWorkers(int requiredMemoryMBSlots, int requiredDiskSlots,
int numWorkerSlots) {
List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
@@ -275,15 +292,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
} else {
WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
- //TODO check disk slot
- // && workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
- if(queryMaster && workerResource.isQueryMasterAllocated()) {
- insufficientWorkers.add(eachWorker);
- continue;
- }
workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
//workerResource.addUsedDiskSlots(requiredDiskSlots);
- workerResource.setQueryMasterAllocated(queryMaster);
selectedWorkers.add(workerResource);
selectedCount++;
} else {
@@ -336,28 +346,33 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
return;
} else {
queryMasterWorkerResource = queryMasterMap.remove(queryId);
+ queryMasterWorkerResource.releaseQueryMasterTask();
}
}
- WorkerResource workerResource = new WorkerResource();
- workerResource.copyId(queryMasterWorkerResource);
- workerResource.setMemoryMBSlots(queryMasterMemoryMB);
- workerResource.setDiskSlots(queryMasterDiskSlot);
- workerResource.setCpuCoreSlots(0);
- workerResource.setQueryMasterAllocated(queryMasterWorkerResource.isQueryMasterAllocated());
- releaseWorkerResource(queryId, workerResource);
LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
}
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
synchronized(workerResourceLock) {
- String hostAndPort = request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort();
- if(allWorkerResourceMap.containsKey(hostAndPort)) {
- if(deadWorkerResources.contains(hostAndPort)) {
- deadWorkerResources.remove(hostAndPort);
- liveWorkerResources.add(hostAndPort);
+ String workerKey = request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort() + ":"
+ + request.getPeerRpcPort();
+ boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+ boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+ if(allWorkerResourceMap.containsKey(workerKey)) {
+ WorkerResource workerResource = allWorkerResourceMap.get(workerKey);
+
+ if(deadWorkerResources.contains(workerKey)) {
+ deadWorkerResources.remove(workerKey);
+ if(queryMasterMode) {
+ liveQueryMasterWorkerResources.add(workerKey);
+ workerResource.setNumRunningTasks(0);
+ }
+ if(taskRunnerMode) {
+ liveWorkerResources.add(workerKey);
+ }
}
- WorkerResource workerResource = allWorkerResourceMap.get(hostAndPort);
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
@@ -365,12 +380,14 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
+ //initial connection
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(request.getTajoWorkerHost());
+ workerResource.setQueryMasterMode(queryMasterMode);
+ workerResource.setTaskRunnerMode(taskRunnerMode);
- int[] ports = new int[] { request.getTajoWorkerPort(), request.getTajoWorkerClientPort() };
-
- workerResource.setPeerRpcPort(request.getTajoWorkerPort());
+ workerResource.setQueryMasterPort(request.getTajoQueryMasterPort());
+ workerResource.setPeerRpcPort(request.getPeerRpcPort());
workerResource.setClientPort(request.getTajoWorkerClientPort());
workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
workerResource.setHttpPort(request.getTajoWorkerHttpPort());
@@ -392,7 +409,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
allWorkerResourceMap.put(workerResource.getId(), workerResource);
- liveWorkerResources.add(hostAndPort);
+ if(queryMasterMode) {
+ liveQueryMasterWorkerResources.add(workerKey);
+ }
+
+ if(taskRunnerMode) {
+ liveWorkerResources.add(workerKey);
+ }
LOG.info("TajoWorker:" + workerResource + " added in live TajoWorker list");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index fad129a..bd7db6d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -20,7 +20,9 @@ package org.apache.tajo.master.rm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.worker.TajoWorker;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -29,6 +31,7 @@ public class WorkerResource {
private String allocatedHost;
private int peerRpcPort;
+ private int queryMasterPort;
private int clientPort;
private int pullServerPort;
private int httpPort;
@@ -47,8 +50,6 @@ public class WorkerResource {
private int numRunningTasks;
- private boolean queryMasterAllocated;
-
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock rlock = lock.readLock();
private final Lock wlock = lock.writeLock();
@@ -57,13 +58,14 @@ public class WorkerResource {
private long lastHeartbeat;
- public String getId() {
- return allocatedHost + ":" + peerRpcPort;
- }
+ private boolean queryMasterMode;
- public void copyId(WorkerResource workerResource) {
- peerRpcPort = workerResource.getPeerRpcPort();
- allocatedHost = workerResource.getAllocatedHost();
+ private boolean taskRunnerMode;
+
+ private AtomicInteger numQueryMasterTasks = new AtomicInteger(0);
+
+ public String getId() {
+ return allocatedHost + ":" + queryMasterPort + ":" + peerRpcPort;
}
public String getAllocatedHost() {
@@ -140,7 +142,7 @@ public class WorkerResource {
}
public String portsToStr() {
- return peerRpcPort + "," + clientPort + "," + pullServerPort;
+ return queryMasterPort + "," + peerRpcPort + "," + clientPort + "," + pullServerPort;
}
public void setLastHeartbeat(long heartbeatTime) {
@@ -194,19 +196,23 @@ public class WorkerResource {
return lastHeartbeat;
}
- public boolean isQueryMasterAllocated() {
- return queryMasterAllocated;
+ public boolean isQueryMasterMode() {
+ return queryMasterMode;
}
- public void setQueryMasterAllocated(boolean queryMasterAllocated) {
- this.queryMasterAllocated = queryMasterAllocated;
+ public void setQueryMasterMode(boolean queryMasterMode) {
+ this.queryMasterMode = queryMasterMode;
}
- public void releaseResource(WorkerResource workerResource) {
- if(workerResource.isQueryMasterAllocated()) {
- queryMasterAllocated = false;
- }
+ public boolean isTaskRunnerMode() {
+ return taskRunnerMode;
+ }
+ public void setTaskRunnerMode(boolean taskRunnerMode) {
+ this.taskRunnerMode = taskRunnerMode;
+ }
+
+ public void releaseResource(WorkerResource workerResource) {
try {
wlock.lock();
usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
@@ -222,7 +228,7 @@ public class WorkerResource {
public int getSlots() {
//TODO what is slot? 512MB = 1slot?
- return getMemoryMBSlots() / 512;
+ return getMemoryMBSlots()/512;
}
public int getAvaliableSlots() {
@@ -243,6 +249,14 @@ public class WorkerResource {
this.peerRpcPort = peerRpcPort;
}
+ public int getQueryMasterPort() {
+ return queryMasterPort;
+ }
+
+ public void setQueryMasterPort(int queryMasterPort) {
+ this.queryMasterPort = queryMasterPort;
+ }
+
public int getClientPort() {
return clientPort;
}
@@ -298,4 +312,20 @@ public class WorkerResource {
public void setNumRunningTasks(int numRunningTasks) {
this.numRunningTasks = numRunningTasks;
}
+
+ public int getNumQueryMasterTasks() {
+ return numQueryMasterTasks.get();
+ }
+
+ public void setNumQueryMasterTasks(int numQueryMasterTasks) {
+ this.numQueryMasterTasks.set(numQueryMasterTasks);
+ }
+
+ public void addNumQueryMasterTask() {
+ numQueryMasterTasks.getAndIncrement();
+ }
+
+ public void releaseQueryMasterTask() {
+ numQueryMasterTasks.getAndDecrement();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 93772ec..2e66f98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryInProgress;
import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
public interface WorkerResourceManager {
@@ -62,4 +63,6 @@ public interface WorkerResourceManager {
public void stop();
public int getNumClusterSlots();
+
+ Collection<String> getQueryMasters();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 0baa55c..802e5ed 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -79,6 +79,10 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
return new HashMap<String, WorkerResource>();
}
+ public Collection<String> getQueryMasters() {
+ return new ArrayList<String>();
+ }
+
public int getNumClusterSlots() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index e42de27..46a5807 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -261,10 +261,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeIdPBImpl nodeId = new NodeIdPBImpl();
- String[] tokens = eachWorker.getWorkerHostAndPort().split(":");
- nodeId.setHost(tokens[0]);
- nodeId.setPort(Integer.parseInt(tokens[1]));
+ nodeId.setHost(eachWorker.getWorkerHost());
+ nodeId.setPort(eachWorker.getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
@@ -278,6 +277,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(nodeId.getHost());
workerResource.setPeerRpcPort(nodeId.getPort());
+ workerResource.setQueryMasterPort(eachWorker.getQueryMasterPort());
workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
workerResource.setDiskSlots(requiredDiskSlots);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 6152da7..92e30bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -33,6 +33,7 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMasterManagerService;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.AsyncRpcClient;
@@ -61,6 +62,12 @@ public class TajoWorker extends CompositeService {
public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+ public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
+ public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
+ public static final String WORKER_MODE_STANDBY = "standby";
+ public static final String WORKER_MODE_QUERY_MASTER = "standby-qm";
+ public static final String WORKER_MODE_TASKRUNNER = "standby-tr";
+
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
private TajoConf systemConf;
@@ -69,6 +76,8 @@ public class TajoWorker extends CompositeService {
private TajoWorkerClientService tajoWorkerClientService;
+ private QueryMasterManagerService queryMasterManagerService;
+
private TajoWorkerManagerService tajoWorkerManagerService;
private InetSocketAddress tajoMasterAddress;
@@ -86,7 +95,11 @@ public class TajoWorker extends CompositeService {
private TajoPullServerService pullService;
- private String daemonMode;
+ private boolean yarnContainerMode;
+
+ private boolean queryMasterMode;
+
+ private boolean taskRunnerMode;
private WorkerHeartbeatThread workerHeartbeatThread;
@@ -100,11 +113,48 @@ public class TajoWorker extends CompositeService {
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- public TajoWorker(String daemonMode) throws Exception {
+ private String[] cmdArgs;
+
+ public TajoWorker() throws Exception {
super(TajoWorker.class.getName());
- this.daemonMode = daemonMode;
}
+ public void startWorker(TajoConf systemConf, String[] args) {
+ this.systemConf = systemConf;
+ this.cmdArgs = args;
+ setWorkerMode(args);
+ init(systemConf);
+ start();
+ }
+
+ private void setWorkerMode(String[] args) {
+ if(args.length < 1) {
+ queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true);
+ taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true);
+ } else {
+ if(WORKER_MODE_STANDBY.equals(args[0])) {
+ queryMasterMode = true;
+ taskRunnerMode = true;
+ } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) {
+ yarnContainerMode = true;
+ queryMasterMode = true;
+ } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) {
+ yarnContainerMode = true;
+ taskRunnerMode = true;
+ } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) {
+ yarnContainerMode = false;
+ queryMasterMode = true;
+ } else {
+ yarnContainerMode = false;
+ taskRunnerMode = true;
+ }
+ }
+ if(!queryMasterMode && !taskRunnerMode) {
+ LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property");
+ System.exit(0);
+ }
+ }
+
@Override
public void init(Configuration conf) {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
@@ -122,35 +172,52 @@ public class TajoWorker extends CompositeService {
}
int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
+ int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
if(randomPort) {
clientPort = 0;
peerRpcPort = 0;
+ qmManagerPort = 0;
systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
}
- if(!"qm".equals(daemonMode)) {
+ if(queryMasterMode) {
+ //querymaster worker
+ tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+ addService(tajoWorkerClientService);
+
+ queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
+ addService(queryMasterManagerService);
+ }
+
+ if(taskRunnerMode) {
+ //taskrunner worker
taskRunnerManager = new TaskRunnerManager(workerContext);
addService(taskRunnerManager);
- }
- if(workerContext.isStandbyMode()) {
- pullService = new TajoPullServerService();
- addService(pullService);
+ if(!yarnContainerMode) {
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+ addService(tajoWorkerManagerService);
+ }
}
- if(!"tr".equals(daemonMode)) {
- tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
- addService(tajoWorkerClientService);
+ LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
+ ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
+ ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort);
- tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
- addService(tajoWorkerManagerService);
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
- + peerRpcPort);
+ if(!yarnContainerMode) {
+ if(taskRunnerMode) {
+ pullService = new TajoPullServerService();
+ addService(pullService);
+ }
if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
try {
httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+ if(queryMasterMode && !taskRunnerMode) {
+ //If QueryMaster and TaskRunner run on single host, http port conflicts
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+ }
webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
true, null, systemConf, null);
webServer.start();
@@ -160,14 +227,27 @@ public class TajoWorker extends CompositeService {
LOG.error(e.getMessage(), e);
}
}
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
- + peerRpcPort);
+ }
+
+ super.init(conf);
+
+ if(yarnContainerMode && queryMasterMode) {
+ String tajoMasterAddress = cmdArgs[2];
+ connectToTajoMaster(tajoMasterAddress);
+ connectToCatalog();
+ QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
+ queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
+ queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+ } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
+ taskRunnerManager.startTask(cmdArgs);
} else {
- LOG.info("Tajo worker started: mode=" + daemonMode);
+ connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ connectToCatalog();
+ workerHeartbeatThread = new WorkerHeartbeatThread();
+ workerHeartbeatThread.start();
}
- super.init(conf);
}
public WorkerContext getWorkerContext() {
@@ -216,13 +296,20 @@ public class TajoWorker extends CompositeService {
public class WorkerContext {
public QueryMaster getQueryMaster() {
- return tajoWorkerManagerService.getQueryMaster();
+ if(queryMasterManagerService == null) {
+ return null;
+ }
+ return queryMasterManagerService.getQueryMaster();
}
public TajoWorkerManagerService getTajoWorkerManagerService() {
return tajoWorkerManagerService;
}
+ public QueryMasterManagerService getQueryMasterManagerService() {
+ return queryMasterManagerService;
+ }
+
public TajoWorkerClientService getTajoWorkerClientService() {
return tajoWorkerClientService;
}
@@ -244,7 +331,11 @@ public class TajoWorker extends CompositeService {
}
public String getWorkerName() {
- return getTajoWorkerManagerService().getHostAndPort();
+ if(queryMasterMode) {
+ return getQueryMasterManagerService().getHostAndPort();
+ } else {
+ return getTajoWorkerManagerService().getHostAndPort();
+ }
}
public void stopWorker(boolean force) {
stop();
@@ -253,8 +344,8 @@ public class TajoWorker extends CompositeService {
}
}
- public boolean isStandbyMode() {
- return !"qm".equals(daemonMode) && !"tr".equals(daemonMode);
+ public boolean isYarnContainerMode() {
+ return yarnContainerMode;
}
public void setNumClusterNodes(int numClusterNodes) {
@@ -272,32 +363,24 @@ public class TajoWorker extends CompositeService {
public int getNumClusterSlots() {
return TajoWorker.this.numClusterSlots.get();
}
- }
-
- public void stopWorkerForce() {
- stop();
- }
- private void setWorkerMode(String[] params) {
- if("qm".equals(daemonMode)) { //QueryMaster mode
+ public int getPeerRpcPort() {
+ return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
+ }
- String tajoMasterAddress = params[2];
- connectToTajoMaster(tajoMasterAddress);
- connectToCatalog();
+ public boolean isQueryMasterMode() {
+ return queryMasterMode;
+ }
- QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
- tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
- queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
- } else if("tr".equals(daemonMode)) { //TaskRunner mode
- taskRunnerManager.startTask(params);
- } else { //Standby mode
- connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
- connectToCatalog();
- workerHeartbeatThread = new WorkerHeartbeatThread();
- workerHeartbeatThread.start();
+ public boolean isTaskRunnerMode() {
+ return taskRunnerMode;
}
}
+ public void stopWorkerForce() {
+ stop();
+ }
+
private void connectToTajoMaster(String tajoMasterAddrString) {
LOG.info("Connecting to TajoMaster (" + tajoMasterAddrString +")");
this.tajoMasterAddress = NetUtils.createSocketAddr(tajoMasterAddrString);
@@ -323,13 +406,14 @@ public class TajoWorker extends CompositeService {
try {
catalogClient = new CatalogClient(systemConf);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}
class WorkerHeartbeatThread extends Thread {
TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+ new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
int workerDisksNum;
List<File> mountPaths;
@@ -338,7 +422,7 @@ public class TajoWorker extends CompositeService {
int workerCpuCoreNum;
boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
-
+
try {
mountPaths = getMountPath();
} catch (Exception e) {
@@ -378,7 +462,38 @@ public class TajoWorker extends CompositeService {
int sendDiskInfoCount = 0;
int pullServerPort = 0;
if(pullService != null) {
- pullServerPort = pullService.getPort();
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ pullServerPort = pullService.getPort();
+ if(pullServerPort > 0) {
+ break;
+ }
+ //waiting while pull server init
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if(System.currentTimeMillis() - startTime > 30 * 1000) {
+ LOG.fatal("Too long push server init.");
+ System.exit(0);
+ }
+ }
+ }
+
+ String hostName = null;
+ int peerRpcPort = 0;
+ int queryMasterPort = 0;
+ int clientPort = 0;
+ if(workerContext.getTajoWorkerManagerService() != null) {
+ hostName = workerContext.getTajoWorkerManagerService().getBindAddr().getHostName();
+ peerRpcPort = workerContext.getTajoWorkerManagerService().getBindAddr().getPort();
+ }
+ if(workerContext.getQueryMasterManagerService() != null) {
+ hostName = workerContext.getQueryMasterManagerService().getBindAddr().getHostName();
+ queryMasterPort = workerContext.getQueryMasterManagerService().getBindAddr().getPort();
+ }
+ if(workerContext.getTajoWorkerClientService() != null) {
+ clientPort = workerContext.getTajoWorkerClientService().getBindAddr().getPort();
}
while(true) {
@@ -406,12 +521,15 @@ public class TajoWorker extends CompositeService {
.setSystem(systemInfo)
.setDiskSlots(workerDisksNum)
.setJvmHeap(jvmHeap)
+ .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(queryMasterMode))
+ .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(taskRunnerMode))
.build();
TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
- .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
- .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setTajoWorkerHost(hostName)
+ .setTajoQueryMasterPort(queryMasterPort)
+ .setPeerRpcPort(peerRpcPort)
+ .setTajoWorkerClientPort(clientPort)
.setTajoWorkerHttpPort(httpPort)
.setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
@@ -463,12 +581,6 @@ public class TajoWorker extends CompositeService {
}
}
- public void startWorker(TajoConf tajoConf, String[] args) {
- init(tajoConf);
- start();
- setWorkerMode(args);
- }
-
String getThreadTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
@@ -549,18 +661,11 @@ public class TajoWorker extends CompositeService {
public static void main(String[] args) throws Exception {
StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
- if(args.length < 1) {
- LOG.error("Wrong startup params");
- System.exit(-1);
- }
-
- String workerMode = args[0];
-
TajoConf tajoConf = new TajoConf();
tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
try {
- TajoWorker tajoWorker = new TajoWorker(workerMode);
+ TajoWorker tajoWorker = new TajoWorker();
tajoWorker.startWorker(tajoConf, args);
} catch (Throwable t) {
LOG.fatal("Error starting TajoWorker", t);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c3e7130..4190dab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -51,8 +51,6 @@ public class TajoWorkerManagerService extends CompositeService
private String addr;
private int port;
- private QueryMaster queryMaster;
-
private TajoWorker.WorkerContext workerContext;
public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
@@ -61,10 +59,6 @@ public class TajoWorkerManagerService extends CompositeService
this.port = port;
}
- public QueryMaster getQueryMaster() {
- return queryMaster;
- }
-
@Override
public void init(Configuration conf) {
TajoConf tajoConf = (TajoConf) conf;
@@ -83,10 +77,6 @@ public class TajoWorkerManagerService extends CompositeService
this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
this.port = bindAddr.getPort();
-
- queryMaster = new QueryMaster(workerContext);
- addService(queryMaster);
-
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -119,99 +109,13 @@ public class TajoWorkerManagerService extends CompositeService
}
@Override
- public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
- RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
- try {
- ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
- QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
- ContainerId cid =
- queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
-
- if(queryMasterTask == null || queryMasterTask.isStopped()) {
- LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
- done.run(TaskSchedulerImpl.stopTaskRunnerReq);
- } else {
- LOG.debug("getTask:" + cid + ", ebId:" + ebId);
- queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
public void ping(RpcController controller,
TajoIdProtos.QueryUnitAttemptIdProto attemptId,
RpcCallback<PrimitiveProtos.BoolProto> done) {
- // TODO - to be completed
-// QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-// context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getExecutionBlockId()).
-// getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-// resetExpireTime();
done.run(TajoWorker.TRUE_PROTO);
}
@Override
- public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
- public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
- public void executeQuery(RpcController controller,
- TajoWorkerProtocol.QueryExecutionRequestProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryId queryId = new QueryId(request.getQueryId());
- LOG.info("Receive executeQuery request:" + queryId);
- queryMaster.handle(new QueryStartEvent(queryId,
- new QueryContext(request.getQueryContext()), request.getSql().getValue(),
- request.getLogicalPlanJson().getValue()));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
public void executeExecutionBlock(RpcController controller,
TajoWorkerProtocol.RunExecutionBlockRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index e30f8c4..e77086d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -40,10 +40,11 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageUtil;
@@ -68,7 +69,7 @@ public class Task {
private final QueryContext queryContext;
private final FileSystem localFS;
private final TaskRunner.TaskRunnerContext taskRunnerContext;
- private final Interface masterProxy;
+ private final QueryMasterProtocol.QueryMasterProtocolService.Interface masterProxy;
private final LocalDirAllocator lDirAllocator;
private final QueryUnitAttemptId taskId;
@@ -125,7 +126,8 @@ public class Task {
};
public Task(QueryUnitAttemptId taskId,
- final TaskRunner.TaskRunnerContext worker, final Interface masterProxy,
+ final TaskRunner.TaskRunnerContext worker,
+ final QueryMasterProtocolService.Interface masterProxy,
final QueryUnitRequest request) throws IOException {
this.request = request;
this.reporter = new Reporter(masterProxy);
@@ -544,12 +546,12 @@ public class Task {
}
protected class Reporter implements Runnable {
- private Interface masterStub;
+ private QueryMasterProtocolService.Interface masterStub;
private Thread pingThread;
private Object lock = new Object();
private static final int PROGRESS_INTERVAL = 3000;
- public Reporter(Interface masterStub) {
+ public Reporter(QueryMasterProtocolService.Interface masterStub) {
this.masterStub = masterStub;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index a5bb55c..f0c9033 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -38,6 +38,8 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.CallFuture;
@@ -68,7 +70,7 @@ public class TaskRunner extends AbstractService {
private ContainerId containerId;
// Cluster Management
- private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
+ private QueryMasterProtocolService.Interface master;
// for temporal or intermediate files
private FileSystem localFS;
@@ -134,12 +136,12 @@ public class TaskRunner extends AbstractService {
UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
//taskOwner.addToken(token);
- // initialize MasterWorkerProtocol as an actual task owner.
+ // initialize QueryMasterProtocol as an actual task owner.
this.client =
taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
@Override
public AsyncRpcClient run() throws Exception {
- return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+ return new AsyncRpcClient(QueryMasterProtocol.class, masterAddr);
}
});
this.master = client.getStub();
@@ -235,7 +237,7 @@ public class TaskRunner extends AbstractService {
return nodeId.toString();
}
- public TajoWorkerProtocolService.Interface getMaster() {
+ public QueryMasterProtocolService.Interface getMaster() {
return master;
}
@@ -280,7 +282,7 @@ public class TaskRunner extends AbstractService {
return taskRunnerContext;
}
- static void fatalError(TajoWorkerProtocolService.Interface proxy,
+ static void fatalError(QueryMasterProtocolService.Interface proxy,
QueryUnitAttemptId taskAttemptId, String message) {
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 85b1e6b..1ea213d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -74,7 +74,7 @@ public class TaskRunnerManager extends CompositeService {
finishedTaskCleanThread.interrupted();
}
super.stop();
- if(!workerContext.isStandbyMode()) {
+ if(workerContext.isYarnContainerMode()) {
workerContext.stopWorker(true);
}
}
@@ -87,7 +87,7 @@ public class TaskRunnerManager extends CompositeService {
finishedTaskRunnerMap.put(id, taskRunner);
}
}
- if(!workerContext.isStandbyMode()) {
+ if(workerContext.isYarnContainerMode()) {
stop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
new file mode 100644
index 0000000..fe5bf03
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "TajoWorkerProtocol.proto";
+
+service QueryMasterProtocolService {
+ //from Worker
+ rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+ rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+ rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+ rpc done (TaskCompletionReport) returns (BoolProto);
+
+ //from TajoMaster's QueryJobManager
+ rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index cdf78a6..f9b15a7 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -54,20 +54,23 @@ message ServerStatusProto {
repeated Disk disk = 3;
required int32 runningTaskNum = 4;
required JvmHeap jvmHeap = 5;
+ required BoolProto queryMasterMode = 6;
+ required BoolProto taskRunnerMode = 7;
}
message TajoHeartbeat {
required string tajoWorkerHost = 1;
- required int32 tajoWorkerPort = 2;
- optional ServerStatusProto serverStatus = 3;
- optional int32 tajoWorkerClientPort = 4;
- optional QueryIdProto queryId = 5;
- optional QueryState state = 6;
- optional string statusMessage = 7;
- optional int32 tajoWorkerPullServerPort = 8;
- optional int32 tajoWorkerHttpPort = 9;
- optional float queryProgress = 10;
- optional int64 queryFinishTime = 11;
+ required int32 peerRpcPort = 2;
+ required int32 tajoQueryMasterPort = 3;
+ optional ServerStatusProto serverStatus = 4;
+ optional int32 tajoWorkerClientPort = 5;
+ optional QueryIdProto queryId = 6;
+ optional QueryState state = 7;
+ optional string statusMessage = 8;
+ optional int32 tajoWorkerPullServerPort = 9;
+ optional int32 tajoWorkerHttpPort = 10;
+ optional float queryProgress = 11;
+ optional int64 queryFinishTime = 12;
}
message TajoHeartbeatResponse {
@@ -89,10 +92,12 @@ message WorkerResourceAllocationRequest {
}
message WorkerResourceProto {
- required string workerHostAndPort = 1;
- required ExecutionBlockIdProto executionBlockId = 2;
- required int32 memoryMBSlots = 3 ;
- required int32 diskSlots = 4;
+ required string host = 1;
+ required int32 peerRpcPort = 2;
+ required int32 queryMasterPort = 3;
+ required ExecutionBlockIdProto executionBlockId = 4;
+ required int32 memoryMBSlots = 5 ;
+ required int32 diskSlots = 6;
}
message WorkerResourceReleaseRequest {
@@ -100,8 +105,10 @@ message WorkerResourceReleaseRequest {
}
message WorkerAllocatedResource {
- required string workerHostAndPort = 1;
- required int32 workerPullServerPort = 2;
+ required string workerHost = 1;
+ required int32 peerRpcPort = 2;
+ required int32 queryMasterPort = 3;
+ required int32 workerPullServerPort = 4;
}
message WorkerResourceAllocationResponse {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 4d75e46..109dfa9 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -155,15 +155,7 @@ message RunExecutionBlockRequestProto {
}
service TajoWorkerProtocolService {
- //from Worker
- rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
- rpc statusUpdate (TaskStatusProto) returns (BoolProto);
rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
- rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
- rpc done (TaskCompletionReport) returns (BoolProto);
-
- //from TajoMaster's QueryJobManager
- rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
//from QueryMaster(Worker)
rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index ab24740..0ca3f9c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -24,4 +24,17 @@
<name>tajo.worker.tmpdir.locations</name>
<value>/tmp/tajo-${user.name}/tmpdir</value>
</property>
+
+ <property>
+ <name>tajo.worker.mode.querymaster</name>
+ <value>true</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>tajo.worker.mode.taskrunner</name>
+ <value>true</value>
+ <description></description>
+ </property>
+
</configuration>
\ No newline at end of file