You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/06 08:33:36 UTC

tajo git commit: TAJO-1135: Implement queryable virtual table for cluster information

Repository: tajo
Updated Branches:
  refs/heads/master 1617fa9b4 -> 6e519bcf3


TAJO-1135: Implement queryable virtual table for cluster information

Closes #366


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e519bcf
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e519bcf
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e519bcf

Branch: refs/heads/master
Commit: 6e519bcf39a526b64e62d9957404bfd8a3888486
Parents: 1617fa9
Author: Jihun Kang <ji...@apache.org>
Authored: Fri Mar 6 16:32:37 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Fri Mar 6 16:32:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../dictionary/ClusterTableDescriptor.java      |  56 ++++++++
 .../InfoSchemaMetadataDictionary.java           |   5 +-
 .../NonForwardQueryResultSystemScanner.java     | 142 +++++++++++++++++++
 .../TestNonForwardQueryResultSystemScanner.java |  18 +++
 5 files changed, 222 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1570e42..974e4d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.11.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-1135: Implement queryable virtual table for cluster information.
+    (jihun)
 
   IMPROVEMENT
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
new file mode 100644
index 0000000..e3c830f
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.dictionary;
+
+import org.apache.tajo.common.TajoDataTypes.Type;
+
+class ClusterTableDescriptor extends AbstractTableDescriptor {
+  
+  private static final String TABLENAME = "cluster";
+  private final ColumnDescriptor[] columns = new ColumnDescriptor[] {
+      new ColumnDescriptor("host", Type.TEXT, 0),
+      new ColumnDescriptor("port", Type.INT4, 0),
+      new ColumnDescriptor("type", Type.TEXT, 0),
+      new ColumnDescriptor("status", Type.TEXT, 0),
+      new ColumnDescriptor("total_cpu", Type.INT4, 0),
+      new ColumnDescriptor("used_mem", Type.INT8, 0),
+      new ColumnDescriptor("total_mem", Type.INT8, 0),
+      new ColumnDescriptor("free_heap", Type.INT8, 0),
+      new ColumnDescriptor("max_heap", Type.INT8, 0),
+      new ColumnDescriptor("used_diskslots", Type.FLOAT4, 0),
+      new ColumnDescriptor("total_diskslots", Type.FLOAT4, 0),
+      new ColumnDescriptor("running_tasks", Type.INT4, 0),
+      new ColumnDescriptor("last_heartbeat_ts", Type.TIMESTAMP, 0)
+  };
+
+  public ClusterTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) {
+    super(metadataDictionary);
+  }
+
+  @Override
+  public String getTableNameString() {
+    return TABLENAME;
+  }
+
+  @Override
+  protected ColumnDescriptor[] getColumnDescriptors() {
+    return columns;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
index de79caa..0ac0a54 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.tajo.catalog.exception.NoSuchTableException;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.util.TUtil;
 
 public class InfoSchemaMetadataDictionary {
@@ -39,6 +40,7 @@ public class InfoSchemaMetadataDictionary {
     TABLEOPTIONS,
     TABLESTATS,
     PARTITIONS,
+    CLUSTER,
     MAX_TABLE;
   }
   
@@ -58,6 +60,7 @@ public class InfoSchemaMetadataDictionary {
     schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this));
     schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this));
     schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this));
+    schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this));
   }
 
   public boolean isSystemDatabase(String databaseName) {
@@ -119,6 +122,6 @@ public class InfoSchemaMetadataDictionary {
   }
   
   protected String getTablePath() {
-    return "SYSTEM";
+    return StoreType.SYSTEM.name().toUpperCase();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 93909d1..e44d8be 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -20,7 +20,10 @@ package org.apache.tajo.master.exec;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -56,6 +59,8 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.plan.InvalidQueryException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
@@ -431,6 +436,141 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     return tuples;
   }
   
+  private Tuple getQueryMasterTuple(Schema outSchema, Worker aWorker) {
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple = new VTuple(outSchema.size());
+    WorkerResource aResource = aWorker.getResource();
+    
+    for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+      Column column = columns.get(fieldId);
+      
+      if ("host".equalsIgnoreCase(column.getSimpleName())) {
+        if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
+          aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+        } else {
+          aTuple.put(fieldId, DatumFactory.createNullDatum());
+        }
+      } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
+        if (aWorker.getConnectionInfo() != null) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getQueryMasterPort()));
+        } else {
+          aTuple.put(fieldId, DatumFactory.createNullDatum());
+        }
+      } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
+        aTuple.put(fieldId, DatumFactory.createText("QueryMaster"));
+      } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
+        aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
+      } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+        if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumQueryMasterTasks()));
+        } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
+        } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
+        } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
+          if (aWorker.getLastHeartbeatTime() > 0) {
+            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      } else {
+        aTuple.put(fieldId, DatumFactory.createNullDatum());
+      }
+    }
+    
+    return aTuple;
+  }
+  
+  private Tuple getWorkerTuple(Schema outSchema, Worker aWorker) {
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple = new VTuple(outSchema.size());
+    WorkerResource aResource = aWorker.getResource();
+    
+    for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+      Column column = columns.get(fieldId);
+      
+      if ("host".equalsIgnoreCase(column.getSimpleName())) {
+        if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
+          aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+        } else {
+          aTuple.put(fieldId, DatumFactory.createNullDatum());
+        }
+      } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
+        if (aWorker.getConnectionInfo() != null) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getPeerRpcPort()));
+        } else {
+          aTuple.put(fieldId, DatumFactory.createNullDatum());
+        }
+      } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
+        aTuple.put(fieldId, DatumFactory.createText("Worker"));
+      } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
+        aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
+      } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+        if ("total_cpu".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getCpuCoreSlots()));
+        } else if ("used_mem".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getUsedMemoryMB()*1048576l));
+        } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMemoryMB()*1048576l));
+        } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
+        } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
+        } else if ("used_diskslots".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getUsedDiskSlots()));
+        } else if ("total_diskslots".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getDiskSlots()));
+        } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumRunningTasks()));
+        } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
+          if (aWorker.getLastHeartbeatTime() > 0) {
+            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      }
+      else {
+        aTuple.put(fieldId, DatumFactory.createNullDatum());
+      }
+    }
+    
+    return aTuple;
+  }
+  
+  private List<Tuple> getClusterInfo(Schema outSchema) {
+    Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers();
+    Set<Integer> keySet = workerMap.keySet();
+    List<Tuple> tuples = Collections.emptyList();
+    List<Worker> queryMasterList = new ArrayList<Worker>();
+    List<Worker> workerList = new ArrayList<Worker>();
+    
+    for (Integer keyId: keySet) {
+      Worker aWorker = workerMap.get(keyId);
+      WorkerResource aResource = aWorker.getResource();
+      
+      if (aResource.isQueryMasterMode()) {
+        queryMasterList.add(aWorker);
+      }
+      
+      if (aResource.isTaskRunnerMode()) {
+        workerList.add(aWorker);
+      }
+    }
+    
+    tuples = new ArrayList<Tuple>(queryMasterList.size() + workerList.size());
+    for (Worker queryMaster: queryMasterList) {
+      tuples.add(getQueryMasterTuple(outSchema, queryMaster));
+    }
+    
+    for (Worker worker: workerList) {
+      tuples.add(getWorkerTuple(outSchema, worker));
+    }
+    
+    return tuples;
+  }
+  
   private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
     List<Tuple> tuples = null;
     String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
@@ -451,6 +591,8 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
       tuples = getAllTableStats(inSchema);
     } else if ("partitions".equalsIgnoreCase(tableName)) {
       tuples = getAllPartitions(inSchema);
+    } else if ("cluster".equalsIgnoreCase(tableName)) {
+      tuples = getClusterInfo(inSchema);
     }
     
     return tuples;    

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
index fa7fdf0..01d4ec4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -295,4 +295,22 @@ public class TestNonForwardQueryResultSystemScanner {
     assertThat(tuples.size(), is(9));
     assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
   }
+  
+  @Test
+  public void testGetClusterDetails() throws Exception {
+    NonForwardQueryResultScanner queryResultScanner =
+        getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
+    
+    queryResultScanner.init();
+    
+    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+    
+    assertThat(rowBytes.size(), is(2));
+    
+    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+    List<Tuple> tuples = getTupleList(decoder, rowBytes);
+    
+    assertThat(tuples.size(), is(2));
+    assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster"))));
+  }
 }