You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/08 10:34:56 UTC
git commit: TAJO-474: Add query admin utility. (DaeMyung Kang via
hyunsik)
Updated Branches:
refs/heads/master ed8b8ec3c -> 285c4a485
TAJO-474: Add query admin utility. (DaeMyung Kang via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/285c4a48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/285c4a48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/285c4a48
Branch: refs/heads/master
Commit: 285c4a485c68e8d0e57145971b13676446010196
Parents: ed8b8ec
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 8 18:33:27 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 8 18:33:27 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/client/TajoAdmin.java | 382 +++++++++++++++++++
.../java/org/apache/tajo/client/TajoClient.java | 30 +-
tajo-client/src/main/proto/ClientProtos.proto | 37 +-
.../tajo/master/TajoMasterClientService.java | 73 +++-
tajo-dist/src/main/bin/tajo | 4 +
6 files changed, 519 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69e36b5..5016ef1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
NEW FEATURES
+ TAJO-474: Add query admin utility. (DaeMyung Kang via hyunsik)
+
TAJO-460: CTAS statement should support partitioned table.
(Min Zhou via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
new file mode 100644
index 0000000..e1ac249
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.cli.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
+import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.sql.SQLException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TajoAdmin {
+ private static final org.apache.commons.cli.Options options;
+ private static DecimalFormat decimalF = new DecimalFormat("###.0");
+ private enum WorkerStatus {
+ LIVE,
+ DEAD,
+ DECOMMISSION
+ }
+
+ final static String line5 = "-----";
+ final static String line7 = "-------";
+ final static String line10 = "----------";
+ final static String line12 = "------------";
+ final static String line15 = "---------------";
+ final static String line20 = "--------------------";
+ final static String line25 = "-------------------------";
+ final static String line30 = "------------------------------";
+ final static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ static {
+ options = new Options();
+ options.addOption("h", "host", true, "Tajo server host");
+ options.addOption("p", "port", true, "Tajo server port");
+ options.addOption("list", "list", false, "Show Tajo query list");
+ options.addOption("cluster", "cluster", false, "Show Cluster Info");
+ options.addOption("desc", "desc", false, "Show Query Description");
+ }
+
+ private static void printUsage() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "admin [options]", options );
+ }
+
+ private static String getQueryState(QueryState state) {
+ String stateStr = "FAILED";
+
+ if (TajoClient.isQueryRunnning(state)) {
+ stateStr = "RUNNING";
+ } else if (state == QueryState.QUERY_SUCCEEDED) {
+ stateStr = "SUCCEED";
+ }
+
+ return stateStr;
+ }
+
+ public static void main(String [] args) throws ParseException, IOException, ServiceException, SQLException {
+ TajoConf conf = new TajoConf();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ String param = "";
+ int cmdType = 0;
+
+ String hostName = null;
+ Integer port = null;
+ if (cmd.hasOption("h")) {
+ hostName = cmd.getOptionValue("h");
+ }
+ if (cmd.hasOption("p")) {
+ port = Integer.parseInt(cmd.getOptionValue("p"));
+ }
+
+ if (cmd.hasOption("list")) {
+ cmdType = 1;
+ } else if (cmd.hasOption("desc")) {
+ cmdType = 2;
+ } else if (cmd.hasOption("cluster")) {
+ cmdType = 3;
+ }
+
+ // if there is no "-h" option,
+ if(hostName == null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+ // it checks if the client service address is given in configuration and distributed mode.
+ // if so, it sets entryAddr.
+ hostName = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
+ }
+ }
+ if (port == null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+ // it checks if the client service address is given in configuration and distributed mode.
+ // if so, it sets entryAddr.
+ port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
+ }
+ }
+
+ if (cmdType == 0) {
+ printUsage();
+ System.exit(0);
+ }
+
+ TajoClient client = null;
+ if ((hostName == null) ^ (port == null)) {
+ System.err.println("ERROR: cannot find valid Tajo server address");
+ System.exit(-1);
+ } else if (hostName != null && port != null) {
+ conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
+ client = new TajoClient(conf);
+ } else if (hostName == null && port == null) {
+ client = new TajoClient(conf);
+ }
+
+ Writer writer = new PrintWriter(System.out);
+ switch(cmdType) {
+ case 1:
+ processList(writer, client);
+ break;
+ case 2:
+ processDesc(writer, client);
+ break;
+ case 3:
+ processCluster(writer, client);
+ break;
+ }
+
+ writer.flush();
+ writer.close();
+
+ System.exit(0);
+ }
+
+ public static void processDesc(Writer writer, TajoClient client) throws ParseException, IOException, ServiceException, SQLException {
+ List<BriefQueryInfo> queryList = client.getQueryList();
+
+ int id = 1;
+ for (BriefQueryInfo queryInfo : queryList) {
+ String queryId = String.format("q_%s_%04d",
+ queryInfo.getQueryId().getId(),
+ queryInfo.getQueryId().getSeq());
+
+ writer.write("Id: " + id);
+ writer.write("\n");
+ id++;
+ writer.write("Query Id: " + queryId);
+ writer.write("\n");
+ writer.write("Started Time: " + df.format(queryInfo.getStartTime()));
+ writer.write("\n");
+ String state = getQueryState(queryInfo.getState());
+ writer.write("Query State: " + state);
+ writer.write("\n");
+ long end = queryInfo.getFinishTime();
+ long start = queryInfo.getStartTime();
+ String executionTime = decimalF.format((end-start) / 1000) + " sec";
+ if (state.equals("RUNNING") == false) {
+ writer.write("Finished Time: " + df.format(queryInfo.getFinishTime()));
+ writer.write("\n");
+ }
+ writer.write("Execution Time: " + executionTime);
+ writer.write("\n");
+ writer.write("Query Progress: " + queryInfo.getProgress());
+ writer.write("\n");
+ writer.write("Query Statement:");
+ writer.write("\n");
+ writer.write(queryInfo.getQuery());
+ writer.write("\n");
+ writer.write("\n");
+ }
+ }
+
+ public static void processCluster(Writer writer, TajoClient client) throws ParseException, IOException, ServiceException, SQLException {
+ List<WorkerResourceInfo> workerList = client.getClusterInfo();
+
+ int runningQueryMasterTasks = 0;
+
+ List<WorkerResourceInfo> liveWorkers = new ArrayList<WorkerResourceInfo>();
+ List<WorkerResourceInfo> deadWorkers = new ArrayList<WorkerResourceInfo>();
+ List<WorkerResourceInfo> decommissionWorkers = new ArrayList<WorkerResourceInfo>();
+
+ List<WorkerResourceInfo> liveQueryMasters = new ArrayList<WorkerResourceInfo>();
+ List<WorkerResourceInfo> deadQueryMasters = new ArrayList<WorkerResourceInfo>();
+
+ for (WorkerResourceInfo eachWorker : workerList) {
+ if(eachWorker.getQueryMasterMode() == true) {
+ if(eachWorker.getWorkerStatus().equals(WorkerStatus.LIVE.toString())) {
+ liveQueryMasters.add(eachWorker);
+ runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks();
+ }
+ if(eachWorker.getWorkerStatus().equals(WorkerStatus.DEAD.toString())) {
+ deadQueryMasters.add(eachWorker);
+ }
+ }
+
+ if(eachWorker.getTaskRunnerMode() == true) {
+ if(eachWorker.getWorkerStatus().equals(WorkerStatus.LIVE.toString())) {
+ liveWorkers.add(eachWorker);
+ } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DEAD.toString())) {
+ deadWorkers.add(eachWorker);
+ } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DECOMMISSION.toString())) {
+ decommissionWorkers.add(eachWorker);
+ }
+ }
+ }
+
+ String fmtInfo = "%1$-5s %2$-5s %3$-5s%n";
+ String infoLine = String.format(fmtInfo, "Live", "Dead", "Tasks");
+
+ writer.write("Query Master\n");
+ writer.write("============\n\n");
+ writer.write(infoLine);
+ String line = String.format(fmtInfo, line5, line5, line5);
+ writer.write(line);
+
+ line = String.format(fmtInfo, liveQueryMasters.size(),
+ deadQueryMasters.size(), runningQueryMasterTasks);
+ writer.write(line);
+ writer.write("\n");
+
+ writer.write("Live QueryMasters\n");
+ writer.write("=================\n\n");
+
+ if (liveQueryMasters.isEmpty()) {
+ writer.write("No Live QueryMasters\n");
+ } else {
+ String fmtQueryMasterLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-10s%n";
+ line = String.format(fmtQueryMasterLine, "QueryMaster", "Port", "Query",
+ "Heap", "Status");
+ writer.write(line);
+ line = String.format(fmtQueryMasterLine, line25, line5,
+ line5, line10, line10);
+ writer.write(line);
+ for (WorkerResourceInfo queryMaster : liveQueryMasters) {
+ String queryMasterHost = String.format("%s:%d",
+ queryMaster.getAllocatedHost(),
+ queryMaster.getQueryMasterPort());
+ String heap = String.format("%d MB", queryMaster.getMaxHeap()/1024/1024);
+ line = String.format(fmtQueryMasterLine, queryMasterHost,
+ queryMaster.getClientPort(),
+ queryMaster.getNumQueryMasterTasks(),
+ heap, queryMaster.getWorkerStatus());
+ writer.write(line);
+ }
+
+ writer.write("\n\n");
+ }
+
+ if (!deadQueryMasters.isEmpty()) {
+ writer.write("Dead QueryMasters\n");
+ writer.write("=================\n\n");
+
+ String fmtQueryMasterLine = "%1$-25s %2$-5s %3$-10s%n";
+ line = String.format(fmtQueryMasterLine, "QueryMaster", "Port", "Status");
+ writer.write(line);
+ line = String.format(fmtQueryMasterLine, line25, line5, line10);
+ writer.write(line);
+
+ for (WorkerResourceInfo queryMaster : deadQueryMasters) {
+ String queryMasterHost = String.format("%s:%d",
+ queryMaster.getAllocatedHost(),
+ queryMaster.getQueryMasterPort());
+ line = String.format(fmtQueryMasterLine, queryMasterHost,
+ queryMaster.getClientPort(),
+ queryMaster.getWorkerStatus());
+ writer.write(line);
+ }
+
+ writer.write("\n\n");
+ }
+
+ writer.write("Worker\n");
+ writer.write("======\n\n");
+
+ String fmtWorkerInfo = "%1$-5s %2$-5s%n";
+ String workerInfoLine = String.format(fmtWorkerInfo, "Live", "Dead");
+ writer.write(workerInfoLine);
+ line = String.format(fmtWorkerInfo, line5, line5);
+ writer.write(line);
+
+ line = String.format(fmtWorkerInfo, liveWorkers.size(), deadWorkers.size());
+ writer.write(line);
+ writer.write("\n");
+
+ writer.write("Live Workers\n");
+ writer.write("============\n\n");
+ if(liveWorkers.isEmpty()) {
+ writer.write("No Live Workers\n\n");
+ } else {
+ writeWorkerInfo(writer, liveWorkers);
+ }
+
+ writer.write("Dead Workers\n");
+ writer.write("============\n\n");
+ if(deadWorkers.isEmpty()) {
+ writer.write("No Dead Workers\n\n");
+ } else {
+ writeWorkerInfo(writer, deadWorkers);
+ }
+ }
+
+ private static void writeWorkerInfo(Writer writer, List<WorkerResourceInfo> workers) throws ParseException, IOException, ServiceException, SQLException {
+ String fmtWorkerLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-10s %6$-12s %7$-10s%n";
+ String line = String.format(fmtWorkerLine,
+ "Worker", "Port", "Tasks",
+ "Mem", "Disk",
+ "Heap", "Status");
+ writer.write(line);
+ line = String.format(fmtWorkerLine,
+ line25, line5, line5,
+ line10, line10,
+ line12, line10);
+ writer.write(line);
+
+ for (WorkerResourceInfo worker : workers) {
+ String workerHost = String.format("%s:%d",
+ worker.getAllocatedHost(),
+ worker.getPeerRpcPort());
+ String mem = String.format("%d/%d", worker.getUsedMemoryMB(),
+ worker.getMemoryMB());
+ String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(),
+ worker.getDiskSlots());
+ String heap = String.format("%d/%d MB", worker.getFreeHeap()/1024/1024,
+ worker.getMaxHeap()/1024/1024);
+
+ line = String.format(fmtWorkerLine, workerHost,
+ worker.getPullServerPort(),
+ worker.getNumRunningTasks(),
+ mem, disk, heap, worker.getWorkerStatus());
+ writer.write(line);
+ }
+ writer.write("\n\n");
+ }
+
+ public static void processList(Writer writer, TajoClient client) throws ParseException, IOException, ServiceException, SQLException {
+ List<BriefQueryInfo> queryList = client.getQueryList();
+
+ String fmt = "%1$-20s %2$-7s %3$-20s %4$-30s%n";
+ String line = String.format(fmt, "QueryId", "State",
+ "StartTime", "Query");
+ writer.write(line);
+ line = String.format(fmt, line20, line7, line20, line30);
+ writer.write(line);
+
+ for (BriefQueryInfo queryInfo : queryList) {
+ String queryId = String.format("q-%s-%04d",
+ queryInfo.getQueryId().getId(),
+ queryInfo.getQueryId().getSeq());
+ String state = getQueryState(queryInfo.getState());
+ String startTime = df.format(queryInfo.getStartTime());
+
+ String sql = StringUtils.abbreviate(queryInfo.getQuery(), 30);
+ line = String.format(fmt, queryId, state, startTime, sql);
+ writer.write(line);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 05a5eff..1e7d169 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -215,7 +215,7 @@ public class TajoClient {
return new QueryStatus(res);
}
- private static boolean isQueryRunnning(QueryState state) {
+ public static boolean isQueryRunnning(QueryState state) {
return state == QueryState.QUERY_NEW ||
state == QueryState.QUERY_RUNNING ||
state == QueryState.QUERY_MASTER_LAUNCHED ||
@@ -380,6 +380,32 @@ public class TajoClient {
}
+ public List<BriefQueryInfo> getQueryList() throws ServiceException {
+ return new ServerCallable<List<BriefQueryInfo>>(conf, tajoMasterAddr,
+ TajoMasterClientProtocol.class, false, true) {
+ public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
+ GetQueryListResponse res = tajoMasterService.getQueryList(null, builder.build());
+ return res.getQueryListList();
+ }
+ }.withRetries();
+ }
+
+ public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
+ return new ServerCallable<List<WorkerResourceInfo>>(conf, tajoMasterAddr,
+ TajoMasterClientProtocol.class, false, true) {
+ public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetClusterInfoRequest.Builder builder = GetClusterInfoRequest.newBuilder();
+ GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+ return res.getWorkerListList();
+ }
+ }.withRetries();
+ }
+
/**
* Get a list of table names. All table and column names are
* represented as lower-case letters.
@@ -459,4 +485,4 @@ public class TajoClient {
client.wait();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 8c4b880..59ef85b 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -69,14 +69,19 @@ message GetQueryListRequest {
optional SessionIdProto sessionId = 1;
}
-message BriefQueryStatus {
+message BriefQueryInfo {
required QueryIdProto queryId = 1;
required QueryState state = 2;
- required int32 executionTime = 3;
+ required int64 startTime = 3;
+ required int64 finishTime = 4;
+ required string query = 5;
+ required string queryMasterHost = 6;
+ required int32 queryMasterPort = 7;
+ required float progress = 8;
}
message GetQueryListResponse {
- repeated BriefQueryStatus queryList = 1;
+ repeated BriefQueryInfo queryList = 1;
}
message GetQueryStatusRequest {
@@ -101,8 +106,32 @@ message GetClusterInfoRequest {
optional SessionIdProto sessionId = 1;
}
+message WorkerResourceInfo {
+ required string allocatedHost = 1;
+ required int32 peerRpcPort = 2;
+ required int32 queryMasterPort = 3;
+ required int32 clientPort = 4;
+ required int32 pullServerPort = 5;
+ required int32 httpPort = 6;
+ required float diskSlots = 7;
+ required int32 cpuCoreSlots = 8;
+ required int32 memoryMB = 9;
+ required float usedDiskSlots = 10;
+ required int32 usedMemoryMB = 11;
+ required int32 usedCpuCoreSlots = 12;
+ required int64 maxHeap = 13;
+ required int64 freeHeap = 14;
+ required int64 totalHeap = 15;
+ required int32 numRunningTasks = 16;
+ required string workerStatus = 17;
+ required int64 lastHeartbeat = 18;
+ required bool queryMasterMode = 19;
+ required bool taskRunnerMode = 20;
+ required int32 numQueryMasterTasks = 21;
+}
+
message GetClusterInfoResponse {
- repeated string serverName = 1;
+ repeated WorkerResourceInfo workerList = 1;
}
message GetTableListRequest {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 3e9720f..e4aae9d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -46,6 +46,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
@@ -53,7 +54,7 @@ import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.util.*;
public class TajoMasterClientService extends AbstractService {
private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
@@ -199,7 +200,33 @@ public class TajoMasterClientService extends AbstractService {
public GetQueryListResponse getQueryList(RpcController controller,
GetQueryListRequest request)
throws ServiceException {
- return null;
+ GetQueryListResponse.Builder builder
+ = GetQueryListResponse.newBuilder();
+
+ Collection<QueryInProgress> queries
+ = context.getQueryJobManager().getRunningQueries();
+
+ BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+ for (QueryInProgress queryInProgress : queries) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+ infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ infoBuilder.setState(queryInfo.getQueryState());
+ infoBuilder.setQuery(queryInfo.getSql());
+ infoBuilder.setStartTime(queryInfo.getStartTime());
+ long endTime = (queryInfo.getFinishTime() == 0) ?
+ System.currentTimeMillis() : queryInfo.getFinishTime();
+ infoBuilder.setFinishTime(endTime);
+ infoBuilder.setProgress(queryInfo.getProgress());
+ infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+ infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+ builder.addQueryList(infoBuilder.build());
+ }
+
+ GetQueryListResponse result = builder.build();
+ return result;
}
@Override
@@ -259,7 +286,47 @@ public class TajoMasterClientService extends AbstractService {
public GetClusterInfoResponse getClusterInfo(RpcController controller,
GetClusterInfoRequest request)
throws ServiceException {
- return null;
+ GetClusterInfoResponse.Builder builder
+ = GetClusterInfoResponse.newBuilder();
+
+ Map<String, WorkerResource> workers
+ = context.getResourceManager().getWorkers();
+
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
+
+ int runningQueryMasterTasks = 0;
+
+ WorkerResourceInfo.Builder workerBuilder
+ = WorkerResourceInfo.newBuilder();
+
+ for(WorkerResource eachWorker: workers.values()) {
+ workerBuilder.setAllocatedHost(eachWorker.getAllocatedHost());
+ workerBuilder.setDiskSlots(eachWorker.getDiskSlots());
+ workerBuilder.setCpuCoreSlots(eachWorker.getCpuCoreSlots());
+ workerBuilder.setMemoryMB(eachWorker.getMemoryMB());
+ workerBuilder.setLastHeartbeat(eachWorker.getLastHeartbeat());
+ workerBuilder.setUsedMemoryMB(eachWorker.getUsedMemoryMB());
+ workerBuilder.setUsedCpuCoreSlots(eachWorker.getUsedCpuCoreSlots());
+ workerBuilder.setUsedDiskSlots(eachWorker.getUsedDiskSlots());
+ workerBuilder.setWorkerStatus(eachWorker.getWorkerStatus().toString());
+ workerBuilder.setQueryMasterMode(eachWorker.isQueryMasterMode());
+ workerBuilder.setTaskRunnerMode(eachWorker.isTaskRunnerMode());
+ workerBuilder.setPeerRpcPort(eachWorker.getPeerRpcPort());
+ workerBuilder.setQueryMasterPort(eachWorker.getQueryMasterPort());
+ workerBuilder.setClientPort(eachWorker.getClientPort());
+ workerBuilder.setPullServerPort(eachWorker.getPullServerPort());
+ workerBuilder.setHttpPort(eachWorker.getHttpPort());
+ workerBuilder.setMaxHeap(eachWorker.getMaxHeap());
+ workerBuilder.setFreeHeap(eachWorker.getFreeHeap());
+ workerBuilder.setTotalHeap(eachWorker.getTotalHeap());
+ workerBuilder.setNumRunningTasks(eachWorker.getNumRunningTasks());
+ workerBuilder.setNumQueryMasterTasks(eachWorker.getNumQueryMasterTasks());
+
+ builder.addWorkerList(workerBuilder.build());
+ }
+
+ return builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/285c4a48/tajo-dist/src/main/bin/tajo
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo
index 2d976de..9de6220 100755
--- a/tajo-dist/src/main/bin/tajo
+++ b/tajo-dist/src/main/bin/tajo
@@ -68,6 +68,7 @@ if [ $# = 0 ]; then
echo " catalog run the Catalog server"
echo " catutil catalog utility"
echo " cli run the tajo cli"
+ echo " admin run the tajo admin util"
echo " jar <jar> run a jar file"
echo " benchmark run the benchmark driver"
echo " or"
@@ -362,6 +363,9 @@ elif [ "$COMMAND" = "catalog" ] ; then
elif [ "$COMMAND" = "cli" ] ; then
CLASS='org.apache.tajo.cli.TajoCli'
TAJO_OPTS="$TAJO_OPTS $TAJO_CLI_OPTS"
+elif [ "$COMMAND" = "admin" ] ; then
+ CLASS='org.apache.tajo.client.TajoAdmin'
+ TAJO_OPTS="$TAJO_OPTS $TAJO_CLI_OPTS"
elif [ "$COMMAND" = "dump" ] ; then
CLASS='org.apache.tajo.client.TajoDump'
TAJO_OPTS="$TAJO_OPTS $TAJO_DUMP_OPTS"