You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/06 08:33:36 UTC
tajo git commit: TAJO-1135: Implement queryable virtual table for
cluster information
Repository: tajo
Updated Branches:
refs/heads/master 1617fa9b4 -> 6e519bcf3
TAJO-1135: Implement queryable virtual table for cluster information
Closes #366
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e519bcf
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e519bcf
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e519bcf
Branch: refs/heads/master
Commit: 6e519bcf39a526b64e62d9957404bfd8a3888486
Parents: 1617fa9
Author: Jihun Kang <ji...@apache.org>
Authored: Fri Mar 6 16:32:37 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Fri Mar 6 16:32:37 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../dictionary/ClusterTableDescriptor.java | 56 ++++++++
.../InfoSchemaMetadataDictionary.java | 5 +-
.../NonForwardQueryResultSystemScanner.java | 142 +++++++++++++++++++
.../TestNonForwardQueryResultSystemScanner.java | 18 +++
5 files changed, 222 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1570e42..974e4d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.11.0 - unreleased
NEW FEATURES
+ TAJO-1135: Implement queryable virtual table for cluster information.
+ (jihun)
IMPROVEMENT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
new file mode 100644
index 0000000..e3c830f
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.dictionary;
+
+import org.apache.tajo.common.TajoDataTypes.Type;
+
+class ClusterTableDescriptor extends AbstractTableDescriptor {
+
+ private static final String TABLENAME = "cluster";
+ private final ColumnDescriptor[] columns = new ColumnDescriptor[] {
+ new ColumnDescriptor("host", Type.TEXT, 0),
+ new ColumnDescriptor("port", Type.INT4, 0),
+ new ColumnDescriptor("type", Type.TEXT, 0),
+ new ColumnDescriptor("status", Type.TEXT, 0),
+ new ColumnDescriptor("total_cpu", Type.INT4, 0),
+ new ColumnDescriptor("used_mem", Type.INT8, 0),
+ new ColumnDescriptor("total_mem", Type.INT8, 0),
+ new ColumnDescriptor("free_heap", Type.INT8, 0),
+ new ColumnDescriptor("max_heap", Type.INT8, 0),
+ new ColumnDescriptor("used_diskslots", Type.FLOAT4, 0),
+ new ColumnDescriptor("total_diskslots", Type.FLOAT4, 0),
+ new ColumnDescriptor("running_tasks", Type.INT4, 0),
+ new ColumnDescriptor("last_heartbeat_ts", Type.TIMESTAMP, 0)
+ };
+
+ public ClusterTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) {
+ super(metadataDictionary);
+ }
+
+ @Override
+ public String getTableNameString() {
+ return TABLENAME;
+ }
+
+ @Override
+ protected ColumnDescriptor[] getColumnDescriptors() {
+ return columns;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
index de79caa..0ac0a54 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.tajo.catalog.exception.NoSuchTableException;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.util.TUtil;
public class InfoSchemaMetadataDictionary {
@@ -39,6 +40,7 @@ public class InfoSchemaMetadataDictionary {
TABLEOPTIONS,
TABLESTATS,
PARTITIONS,
+ CLUSTER,
MAX_TABLE;
}
@@ -58,6 +60,7 @@ public class InfoSchemaMetadataDictionary {
schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this));
schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this));
schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this));
+ schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this));
}
public boolean isSystemDatabase(String databaseName) {
@@ -119,6 +122,6 @@ public class InfoSchemaMetadataDictionary {
}
protected String getTablePath() {
- return "SYSTEM";
+ return StoreType.SYSTEM.name().toUpperCase();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 93909d1..e44d8be 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -20,7 +20,10 @@ package org.apache.tajo.master.exec;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -56,6 +59,8 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.plan.InvalidQueryException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
@@ -431,6 +436,141 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
return tuples;
}
+ private Tuple getQueryMasterTuple(Schema outSchema, Worker aWorker) {
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple = new VTuple(outSchema.size());
+ WorkerResource aResource = aWorker.getResource();
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("host".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
+ aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getConnectionInfo() != null) {
+ aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getQueryMasterPort()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText("QueryMaster"));
+ } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
+ } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+ if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumQueryMasterTasks()));
+ } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
+ } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
+ } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getLastHeartbeatTime() > 0) {
+ aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+
+ return aTuple;
+ }
+
+ private Tuple getWorkerTuple(Schema outSchema, Worker aWorker) {
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple = new VTuple(outSchema.size());
+ WorkerResource aResource = aWorker.getResource();
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("host".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
+ aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getConnectionInfo() != null) {
+ aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getPeerRpcPort()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText("Worker"));
+ } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
+ } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+ if ("total_cpu".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(aResource.getCpuCoreSlots()));
+ } else if ("used_mem".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getUsedMemoryMB()*1048576l));
+ } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMemoryMB()*1048576l));
+ } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
+ } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
+ } else if ("used_diskslots".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getUsedDiskSlots()));
+ } else if ("total_diskslots".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getDiskSlots()));
+ } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumRunningTasks()));
+ } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
+ if (aWorker.getLastHeartbeatTime() > 0) {
+ aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+ else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+
+ return aTuple;
+ }
+
+ private List<Tuple> getClusterInfo(Schema outSchema) {
+ Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers();
+ Set<Integer> keySet = workerMap.keySet();
+ List<Tuple> tuples = Collections.emptyList();
+ List<Worker> queryMasterList = new ArrayList<Worker>();
+ List<Worker> workerList = new ArrayList<Worker>();
+
+ for (Integer keyId: keySet) {
+ Worker aWorker = workerMap.get(keyId);
+ WorkerResource aResource = aWorker.getResource();
+
+ if (aResource.isQueryMasterMode()) {
+ queryMasterList.add(aWorker);
+ }
+
+ if (aResource.isTaskRunnerMode()) {
+ workerList.add(aWorker);
+ }
+ }
+
+ tuples = new ArrayList<Tuple>(queryMasterList.size() + workerList.size());
+ for (Worker queryMaster: queryMasterList) {
+ tuples.add(getQueryMasterTuple(outSchema, queryMaster));
+ }
+
+ for (Worker worker: workerList) {
+ tuples.add(getWorkerTuple(outSchema, worker));
+ }
+
+ return tuples;
+ }
+
private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
List<Tuple> tuples = null;
String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
@@ -451,6 +591,8 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
tuples = getAllTableStats(inSchema);
} else if ("partitions".equalsIgnoreCase(tableName)) {
tuples = getAllPartitions(inSchema);
+ } else if ("cluster".equalsIgnoreCase(tableName)) {
+ tuples = getClusterInfo(inSchema);
}
return tuples;
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
index fa7fdf0..01d4ec4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -295,4 +295,22 @@ public class TestNonForwardQueryResultSystemScanner {
assertThat(tuples.size(), is(9));
assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
}
+
+ @Test
+ public void testGetClusterDetails() throws Exception {
+ NonForwardQueryResultScanner queryResultScanner =
+ getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
+
+ queryResultScanner.init();
+
+ List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+
+ assertThat(rowBytes.size(), is(2));
+
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+ List<Tuple> tuples = getTupleList(decoder, rowBytes);
+
+ assertThat(tuples.size(), is(2));
+ assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster"))));
+ }
}