You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/31 11:51:26 UTC
[5/8] tajo git commit: TAJO-1176: Implements queryable virtual tables
for catalog information
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 9575c13..51f65ee 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -22,6 +22,7 @@
package org.apache.tajo.catalog.store;
import com.google.common.collect.Maps;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoConstants;
@@ -29,7 +30,17 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
+import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.util.*;
@@ -84,6 +95,22 @@ public class MemStore implements CatalogStore {
public Collection<String> getAllTablespaceNames() throws CatalogException {
return tablespaces.keySet();
}
+
+ @Override
+ public List<TablespaceProto> getTablespaces() throws CatalogException {
+ List<TablespaceProto> tablespaceList = TUtil.newList();
+ int tablespaceId = 0;
+
+ for (String spaceName: tablespaces.keySet()) {
+ TablespaceProto.Builder builder = TablespaceProto.newBuilder();
+ builder.setSpaceName(spaceName);
+ builder.setUri(tablespaces.get(spaceName));
+ builder.setId(tablespaceId++);
+ tablespaceList.add(builder.build());
+ }
+
+ return tablespaceList;
+ }
@Override
public TablespaceProto getTablespace(String spaceName) throws CatalogException {
@@ -139,6 +166,24 @@ public class MemStore implements CatalogStore {
public Collection<String> getAllDatabaseNames() throws CatalogException {
return databases.keySet();
}
+
+ @Override
+ public List<DatabaseProto> getAllDatabases() throws CatalogException {
+ List<DatabaseProto> databaseList = new ArrayList<DatabaseProto>();
+ int dbId = 0;
+
+ for (String databaseName: databases.keySet()) {
+ DatabaseProto.Builder builder = DatabaseProto.newBuilder();
+
+ builder.setId(dbId++);
+ builder.setName(databaseName);
+ builder.setSpaceId(0);
+
+ databaseList.add(builder.build());
+ }
+
+ return databaseList;
+ }
/**
* Get a database namespace from a Map instance.
@@ -303,6 +348,118 @@ public class MemStore implements CatalogStore {
Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
return new ArrayList<String>(database.keySet());
}
+
+ @Override
+ public List<TableDescriptorProto> getAllTables() throws CatalogException {
+ List<TableDescriptorProto> tableList = new ArrayList<CatalogProtos.TableDescriptorProto>();
+ int dbId = 0, tableId = 0;
+
+ for (String databaseName: databases.keySet()) {
+ Map<String, TableDescProto> tables = databases.get(databaseName);
+ List<String> tableNameList = TUtil.newList(tables.keySet());
+ Collections.sort(tableNameList);
+
+ for (String tableName: tableNameList) {
+ TableDescProto tableDesc = tables.get(tableName);
+ TableDescriptorProto.Builder builder = TableDescriptorProto.newBuilder();
+
+ builder.setDbId(dbId);
+ builder.setTid(tableId);
+ builder.setName(tableName);
+ builder.setPath(tableDesc.getPath());
+ builder.setTableType(tableDesc.getIsExternal()?"EXTERNAL":"BASE");
+ builder.setStoreType(tableDesc.getMeta().getStoreType().toString());
+
+ tableList.add(builder.build());
+ tableId++;
+ }
+ dbId++;
+ }
+
+ return tableList;
+ }
+
+ @Override
+ public List<TableOptionProto> getAllTableOptions() throws CatalogException {
+ List<TableOptionProto> optionList = new ArrayList<CatalogProtos.TableOptionProto>();
+ int tid = 0;
+
+ for (String databaseName: databases.keySet()) {
+ Map<String, TableDescProto> tables = databases.get(databaseName);
+ List<String> tableNameList = TUtil.newList(tables.keySet());
+ Collections.sort(tableNameList);
+
+ for (String tableName: tableNameList) {
+ TableDescProto table = tables.get(tableName);
+ List<KeyValueProto> keyValueList = table.getMeta().getParams().getKeyvalList();
+
+ for (KeyValueProto keyValue: keyValueList) {
+ TableOptionProto.Builder builder = TableOptionProto.newBuilder();
+
+ builder.setTid(tid);
+ builder.setKeyval(keyValue);
+
+ optionList.add(builder.build());
+ }
+ }
+ tid++;
+ }
+
+ return optionList;
+ }
+
+ @Override
+ public List<TableStatsProto> getAllTableStats() throws CatalogException {
+ List<TableStatsProto> statList = new ArrayList<CatalogProtos.TableStatsProto>();
+ int tid = 0;
+
+ for (String databaseName: databases.keySet()) {
+ Map<String, TableDescProto> tables = databases.get(databaseName);
+ List<String> tableNameList = TUtil.newList(tables.keySet());
+ Collections.sort(tableNameList);
+
+ for (String tableName: tableNameList) {
+ TableDescProto table = tables.get(tableName);
+ TableStatsProto.Builder builder = TableStatsProto.newBuilder();
+
+ builder.setTid(tid);
+ builder.setNumRows(table.getStats().getNumRows());
+ builder.setNumBytes(table.getStats().getNumBytes());
+
+ statList.add(builder.build());
+ }
+ tid++;
+ }
+
+ return statList;
+ }
+
+ @Override
+ public List<ColumnProto> getAllColumns() throws CatalogException {
+ List<ColumnProto> columnList = new ArrayList<CatalogProtos.ColumnProto>();
+ int tid = 0;
+
+ for (String databaseName: databases.keySet()) {
+ Map<String, TableDescProto> tables = databases.get(databaseName);
+ List<String> tableNameList = TUtil.newList(tables.keySet());
+ Collections.sort(tableNameList);
+
+ for (String tableName: tableNameList) {
+ TableDescProto tableDesc = tables.get(tableName);
+
+ for (ColumnProto column: tableDesc.getSchema().getFieldsList()) {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+ builder.setTid(tid);
+ builder.setName(column.getName());
+ builder.setDataType(column.getDataType());
+ columnList.add(builder.build());
+ }
+ }
+ tid++;
+ }
+
+ return columnList;
+ }
@Override
public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException {
@@ -370,6 +527,11 @@ public class MemStore implements CatalogStore {
public void dropPartitions(String tableName) throws CatalogException {
throw new RuntimeException("not supported!");
}
+
+ @Override
+ public List<TablePartitionProto> getAllPartitions() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
/* (non-Javadoc)
* @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
@@ -455,6 +617,33 @@ public class MemStore implements CatalogStore {
return protos.toArray(new IndexDescProto[protos.size()]);
}
+
+ @Override
+ public List<IndexProto> getAllIndexes() throws CatalogException {
+ List<IndexProto> indexList = new ArrayList<CatalogProtos.IndexProto>();
+ Set<String> databases = indexes.keySet();
+
+ for (String databaseName: databases) {
+ Map<String, IndexDescProto> indexMap = indexes.get(databaseName);
+
+ for (String indexName: indexMap.keySet()) {
+ IndexDescProto indexDesc = indexMap.get(indexName);
+ IndexProto.Builder builder = IndexProto.newBuilder();
+
+ builder.setColumnName(indexDesc.getColumn().getName());
+ builder.setDataType(indexDesc.getColumn().getDataType().getType().toString());
+ builder.setIndexName(indexName);
+ builder.setIndexType(indexDesc.getIndexMethod().toString());
+ builder.setIsAscending(indexDesc.hasIsAscending() && indexDesc.getIsAscending());
+ builder.setIsClustered(indexDesc.hasIsClustered() && indexDesc.getIsClustered());
+ builder.setIsUnique(indexDesc.hasIsUnique() && indexDesc.getIsUnique());
+
+ indexList.add(builder.build());
+ }
+ }
+
+ return indexList;
+ }
@Override
public void addFunction(FunctionDesc func) throws CatalogException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index a044d64..43c6f7d 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -19,8 +19,10 @@
package org.apache.tajo.catalog;
import com.google.common.collect.Sets;
+
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
import org.apache.tajo.catalog.exception.CatalogException;
import org.apache.tajo.catalog.exception.NoSuchFunctionException;
import org.apache.tajo.catalog.store.PostgreSQLStore;
@@ -53,7 +55,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceType;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.SetLocation;
import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
public class TestCatalog {
static final String FieldName1="f1";
@@ -211,6 +212,7 @@ public class TestCatalog {
@Test
public void testCreateAndDropManyDatabases() throws Exception {
List<String> createdDatabases = new ArrayList<String>();
+ InfoSchemaMetadataDictionary dictionary = new InfoSchemaMetadataDictionary();
String namePrefix = "database_";
final int NUM = 10;
for (int i = 0; i < NUM; i++) {
@@ -223,10 +225,11 @@ public class TestCatalog {
Collection<String> allDatabaseNames = catalog.getAllDatabaseNames();
for (String databaseName : allDatabaseNames) {
- assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName));
+ assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName) ||
+ dictionary.isSystemDatabase(databaseName));
}
- // additional one is 'default' database.
- assertEquals(NUM + 1, allDatabaseNames.size());
+ // additional ones are 'default' and 'system' databases.
+ assertEquals(NUM + 2, allDatabaseNames.size());
Collections.shuffle(createdDatabases);
for (String tobeDropped : createdDatabases) {
@@ -351,8 +354,8 @@ public class TestCatalog {
}
}
- // Finally, only default database will remain. So, its result is 1.
- assertEquals(1, catalog.getAllDatabaseNames().size());
+ // Finally, default and system database will remain. So, its result is 1.
+ assertEquals(2, catalog.getAllDatabaseNames().size());
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
new file mode 100644
index 0000000..d6ea459
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
+ private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
+
+ private QueryId queryId;
+ private String sessionId;
+ private SeqScanExec scanExec;
+ private TableDesc tableDesc;
+ private RowStoreEncoder rowEncoder;
+ private int maxRow;
+ private int currentNumRows;
+ private TaskAttemptContext taskContext;
+ private TajoConf tajoConf;
+ private ScanNode scanNode;
+
+ private int currentFragmentIndex = 0;
+
+ public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
+ TableDesc tableDesc, int maxRow) throws IOException {
+ this.tajoConf = tajoConf;
+ this.sessionId = sessionId;
+ this.queryId = queryId;
+ this.scanNode = scanNode;
+ this.tableDesc = tableDesc;
+ this.maxRow = maxRow;
+ this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+ }
+
+ public void init() throws IOException {
+ initSeqScanExec();
+ }
+
+ private void initSeqScanExec() throws IOException {
+ List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+ .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+
+ if (fragments != null && !fragments.isEmpty()) {
+ FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
+ this.taskContext = new TaskAttemptContext(
+ new QueryContext(tajoConf), null,
+ new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
+ fragmentProtos, null);
+ try {
+ // scanNode must be clone cause SeqScanExec change target in the case of
+ // a partitioned table.
+ scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos);
+ } catch (CloneNotSupportedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ scanExec.init();
+ currentFragmentIndex += fragments.size();
+ }
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setScanExec(SeqScanExec scanExec) {
+ this.scanExec = scanExec;
+ }
+
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ public void close() throws Exception {
+ if (scanExec != null) {
+ scanExec.close();
+ scanExec = null;
+ }
+ }
+
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+ List<ByteString> rows = new ArrayList<ByteString>();
+ if (scanExec == null) {
+ return rows;
+ }
+ int rowCount = 0;
+ while (true) {
+ Tuple tuple = scanExec.next();
+ if (tuple == null) {
+ scanExec.close();
+ scanExec = null;
+ initSeqScanExec();
+ if (scanExec != null) {
+ tuple = scanExec.next();
+ }
+ if (tuple == null) {
+ if (scanExec != null) {
+ scanExec.close();
+ scanExec = null;
+ }
+ break;
+ }
+ }
+ rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
+ rowCount++;
+ currentNumRows++;
+ if (rowCount >= fetchRowNum) {
+ break;
+ }
+ if (currentNumRows >= maxRow) {
+ scanExec.close();
+ scanExec = null;
+ break;
+ }
+ }
+ return rows;
+ }
+
+ @Override
+ public Schema getLogicalSchema() {
+ return tableDesc.getLogicalSchema();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index aced80c..7e7d705 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -18,149 +18,29 @@
package org.apache.tajo.master;
-import com.google.protobuf.ByteString;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-public class NonForwardQueryResultScanner {
- private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
-
- private QueryId queryId;
- private String sessionId;
- private SeqScanExec scanExec;
- private TableDesc tableDesc;
- private RowStoreEncoder rowEncoder;
- private int maxRow;
- private int currentNumRows;
- private TaskAttemptContext taskContext;
- private TajoConf tajoConf;
- private ScanNode scanNode;
-
- private int currentFragmentIndex = 0;
-
- public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
- QueryId queryId,
- ScanNode scanNode,
- TableDesc tableDesc,
- int maxRow) throws IOException {
- this.tajoConf = tajoConf;
- this.sessionId = sessionId;
- this.queryId = queryId;
- this.scanNode = scanNode;
- this.tableDesc = tableDesc;
- this.maxRow = maxRow;
-
- this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
- }
-
- public void init() throws IOException {
- initSeqScanExec();
- }
-
- private void initSeqScanExec() throws IOException {
- List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
- .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
-
- if (fragments != null && !fragments.isEmpty()) {
- FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
- this.taskContext = new TaskAttemptContext(
- new QueryContext(tajoConf), null,
- new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
- fragmentProtos, null);
-
- try {
- // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
- scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos);
- } catch (CloneNotSupportedException e) {
- throw new IOException(e.getMessage(), e);
- }
- scanExec.init();
- currentFragmentIndex += fragments.size();
- }
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public String getSessionId() {
- return sessionId;
- }
-
- public void setScanExec(SeqScanExec scanExec) {
- this.scanExec = scanExec;
- }
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
- public TableDesc getTableDesc() {
- return tableDesc;
- }
+import com.google.protobuf.ByteString;
- public void close() throws Exception {
- if (scanExec != null) {
- scanExec.close();
- scanExec = null;
- }
- }
+public interface NonForwardQueryResultScanner {
- public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
- List<ByteString> rows = new ArrayList<ByteString>();
- if (scanExec == null) {
- return rows;
- }
- int rowCount = 0;
+ public void close() throws Exception;
- while (true) {
- Tuple tuple = scanExec.next();
- if (tuple == null) {
- scanExec.close();
- scanExec = null;
+ public Schema getLogicalSchema();
- initSeqScanExec();
- if (scanExec != null) {
- tuple = scanExec.next();
- }
- if (tuple == null) {
- if (scanExec != null ) {
- scanExec.close();
- scanExec = null;
- }
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
- break;
- }
- }
- rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
- rowCount++;
- currentNumRows++;
- if (rowCount >= fetchRowNum) {
- break;
- }
+ public QueryId getQueryId();
+
+ public String getSessionId();
+
+ public TableDesc getTableDesc();
- if (currentNumRows >= maxRow) {
- scanExec.close();
- scanExec = null;
- break;
- }
- }
+ public void init() throws IOException;
- return rows;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..c6466f5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.codegen.CompilationError;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import com.google.protobuf.ByteString;
+
+public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+
+ private final Log LOG = LogFactory.getLog(getClass());
+
+ private MasterContext masterContext;
+ private LogicalPlan logicalPlan;
+ private final QueryId queryId;
+ private final String sessionId;
+ private TaskAttemptContext taskContext;
+ private int currentRow;
+ private long maxRow;
+ private TableDesc tableDesc;
+ private Schema outSchema;
+ private RowStoreEncoder encoder;
+ private PhysicalExec physicalExec;
+
+ public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId,
+ String sessionId, int maxRow) {
+ masterContext = context;
+ logicalPlan = plan;
+ this.queryId = queryId;
+ this.sessionId = sessionId;
+ this.maxRow = maxRow;
+
+ }
+
+ @Override
+ public void init() throws IOException {
+ QueryContext queryContext = new QueryContext(masterContext.getConf());
+ currentRow = 0;
+
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+ try {
+ globalPlanner.build(masterPlan);
+ } catch (PlanningException e) {
+ throw new RuntimeException(e);
+ }
+
+ ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+ ExecutionBlock leafBlock = null;
+ while (cursor.hasNext()) {
+ ExecutionBlock block = cursor.nextBlock();
+ if (masterPlan.isLeaf(block)) {
+ leafBlock = block;
+ break;
+ }
+ }
+
+ taskContext = new TaskAttemptContext(queryContext, null,
+ new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+ null, null);
+ physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+ .createPlan(taskContext, leafBlock.getPlan());
+
+ tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(),
+ new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+ outSchema = physicalExec.getSchema();
+ encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+
+ physicalExec.init();
+ }
+
+ @Override
+ public void close() throws Exception {
+ tableDesc = null;
+ outSchema = null;
+ encoder = null;
+ if (physicalExec != null) {
+ try {
+ physicalExec.close();
+ } catch (Exception ignored) {}
+ }
+ physicalExec = null;
+ currentRow = -1;
+ }
+
+ private List<Tuple> getTablespaces(Schema outSchema) {
+ List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+ List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablespaceProto tablespace: tablespaces) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+ } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasHandler()) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+ }
+ }
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getDatabases(Schema outSchema) {
+ List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+ List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (DatabaseProto database: databases) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+ } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+ } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (database.hasSpaceId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getTables(Schema outSchema) {
+ List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+ List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableDescriptorProto table: tables) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+ } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+ } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+ } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+ if (table.hasTableType()) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+ } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getColumns(Schema outSchema) {
+ List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+ List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+ int columnId = 1, prevtid = -1, tid = 0;
+
+ for (ColumnProto column: columnsList) {
+ aTuple = new VTuple(outSchema.size());
+
+ tid = column.getTid();
+ if (prevtid != tid) {
+ columnId = 1;
+ prevtid = tid;
+ }
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column colObj = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+ if (column.hasTid()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tid));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+ } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+ } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+ } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+ DataType dataType = column.getDataType();
+ if (dataType.hasLength()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ columnId++;
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getIndexes(Schema outSchema) {
+ List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+ List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (IndexProto index: indexList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+ } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
+ } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
+ } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
+ } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
+ } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
+ } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
+ } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableOptions(Schema outSchema) {
+ List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+ List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableOptionProto option: optionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+ } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+ } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableStats(Schema outSchema) {
+ List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+ List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableStatsProto stat: statList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+ } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+ } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllPartitions(Schema outSchema) {
+ List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+ List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablePartitionProto partition: partitionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+ } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+ if (partition.hasPartitionName()) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+ List<Tuple> tuples = null;
+ String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+
+ if ("tablespace".equalsIgnoreCase(tableName)) {
+ tuples = getTablespaces(inSchema);
+ } else if ("databases".equalsIgnoreCase(tableName)) {
+ tuples = getDatabases(inSchema);
+ } else if ("tables".equalsIgnoreCase(tableName)) {
+ tuples = getTables(inSchema);
+ } else if ("columns".equalsIgnoreCase(tableName)) {
+ tuples = getColumns(inSchema);
+ } else if ("indexes".equalsIgnoreCase(tableName)) {
+ tuples = getIndexes(inSchema);
+ } else if ("table_options".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableOptions(inSchema);
+ } else if ("table_stats".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableStats(inSchema);
+ } else if ("partitions".equalsIgnoreCase(tableName)) {
+ tuples = getAllPartitions(inSchema);
+ }
+
+ return tuples;
+ }
+
+ @Override
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+ List<ByteString> rows = new ArrayList<ByteString>();
+ int startRow = currentRow;
+ int endRow = startRow + fetchRowNum;
+
+ if (physicalExec == null) {
+ return rows;
+ }
+
+ while (currentRow < endRow) {
+ Tuple currentTuple = physicalExec.next();
+
+ if (currentTuple == null) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+
+ currentRow++;
+ rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+
+ if (currentRow >= maxRow) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+ }
+
+ return rows;
+ }
+
+ @Override
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ @Override
+ public Schema getLogicalSchema() {
+ return outSchema;
+ }
+
+ class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+
+ public SimplePhysicalPlannerImpl(TajoConf conf) {
+ super(conf);
+ }
+
+ @Override
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+ throws IOException {
+ return new SystemPhysicalExec(ctx, scanNode);
+ }
+
+ @Override
+ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+ return new SystemPhysicalExec(ctx, annotation);
+ }
+ }
+
+ class SystemPhysicalExec extends PhysicalExec {
+
+ private ScanNode scanNode;
+ private EvalNode qual;
+ private Projector projector;
+ private TableStats tableStats;
+ private final List<Tuple> cachedData;
+ private int currentRow;
+ private boolean isClosed;
+
+ public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+ super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+ this.scanNode = scanNode;
+ this.qual = this.scanNode.getQual();
+ cachedData = TUtil.newList();
+ currentRow = 0;
+ isClosed = false;
+
+ projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple aTuple = null;
+ Tuple outTuple = new VTuple(outColumnNum);
+
+ if (isClosed) {
+ return null;
+ }
+
+ if (cachedData.size() == 0) {
+ rescan();
+ }
+
+ if (!scanNode.hasQual()) {
+ if (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ projector.eval(aTuple, outTuple);
+ outTuple.setOffset(aTuple.getOffset());
+ return outTuple;
+ }
+ return null;
+ } else {
+ while (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ if (qual.eval(inSchema, aTuple).isTrue()) {
+ projector.eval(aTuple, outTuple);
+ return outTuple;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ cachedData.clear();
+ cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+
+ tableStats = new TableStats();
+ tableStats.setNumRows(cachedData.size());
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanNode = null;
+ qual = null;
+ projector = null;
+ cachedData.clear();
+ currentRow = -1;
+ isClosed = true;
+ }
+
+ @Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
+ protected void compile() throws CompilationError {
+ if (scanNode.hasQual()) {
+ qual = context.getPrecompiledEval(inSchema, qual);
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index ee99353..c413b65 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -525,7 +525,7 @@ public class TajoMasterClientService extends AbstractService {
List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum());
SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder();
- resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto());
+ resultSetBuilder.setSchema(queryResultScanner.getLogicalSchema().getProto());
resultSetBuilder.addAllSerializedTuples(rows);
builder.setResultSet(resultSetBuilder.build());
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 10701f9..2242445 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -41,7 +41,9 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.master.NonForwardQueryResultFileScanner;
import org.apache.tajo.master.NonForwardQueryResultScanner;
+import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
@@ -104,6 +106,8 @@ public class QueryExecutor {
} else if (plan.isExplain()) { // explain query
execExplain(plan, response);
+ } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
+ execQueryOnVirtualTable(queryContext, session, sql, plan, response);
// Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
} else if (PlannerUtil.checkIfSimpleQuery(plan)) {
@@ -183,6 +187,27 @@ public class QueryExecutor {
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
}
+ public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan,
+ SubmitQueryResponse.Builder response) throws Exception {
+ int maxRow = Integer.MAX_VALUE;
+ if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+ LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+ maxRow = (int) limitNode.getFetchFirstNum();
+ }
+ QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+ NonForwardQueryResultScanner queryResultScanner =
+ new NonForwardQueryResultSystemScanner(context, plan, queryId, session.getSessionId(), maxRow);
+
+ queryResultScanner.init();
+ session.addNonForwardQueryResultScanner(queryResultScanner);
+
+ response.setQueryId(queryId.getProto());
+ response.setMaxRowNum(maxRow);
+ response.setTableDesc(queryResultScanner.getTableDesc().getProto());
+ response.setResultCode(ClientProtos.ResultCode.OK);
+ }
+
public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
SubmitQueryResponse.Builder response) throws Exception {
ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
@@ -202,7 +227,7 @@ public class QueryExecutor {
QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
NonForwardQueryResultScanner queryResultScanner =
- new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+ new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
queryResultScanner.init();
session.addNonForwardQueryResultScanner(queryResultScanner);
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..bdd6dfc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LimitNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.KeyValueSet;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestNonForwardQueryResultSystemScanner {
+
+ private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> {
+
+ private final Matcher<? extends T> matcher;
+
+ public CollectionMatcher(Matcher<? extends T> matcher) {
+ this.matcher = matcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a collection containing ").appendDescriptionOf(this.matcher);
+ }
+
+ @Override
+ protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) {
+ boolean isFirst = true;
+ Iterator<? extends T> iterator = item.iterator();
+
+ while (iterator.hasNext()) {
+ T obj = iterator.next();
+ if (this.matcher.matches(obj)) {
+ return true;
+ }
+
+ if (!isFirst) {
+ mismatchDescription.appendText(", ");
+ }
+
+ this.matcher.describeMismatch(obj, mismatchDescription);
+ isFirst = false;
+ }
+ return false;
+ }
+
+ }
+
+ private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) {
+ return new CollectionMatcher<T>(matcher);
+ }
+
+ private static LocalTajoTestingUtility testUtil;
+ private static TajoTestingCluster testingCluster;
+ private static TajoConf conf;
+ private static MasterContext masterContext;
+
+ private static SQLAnalyzer analyzer;
+ private static LogicalPlanner logicalPlanner;
+ private static LogicalOptimizer logicalOptimizer;
+
+ private static void setupTestingCluster() throws Exception {
+ testUtil = new LocalTajoTestingUtility();
+ String[] names, paths;
+ Schema[] schemas;
+
+ TPCH tpch = new TPCH();
+ tpch.loadSchemas();
+ tpch.loadQueries();
+
+ names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp",
+ "region", "supplier", "empty_orders"};
+ schemas = new Schema[names.length];
+ for (int i = 0; i < names.length; i++) {
+ schemas[i] = tpch.getSchema(names[i]);
+ }
+
+ File file;
+ paths = new String[names.length];
+ for (int i = 0; i < names.length; i++) {
+ file = new File("src/test/tpch/" + names[i] + ".tbl");
+ if(!file.exists()) {
+ file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
+ + ".tbl");
+ }
+ paths[i] = file.getAbsolutePath();
+ }
+
+ KeyValueSet opt = new KeyValueSet();
+ opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ testUtil.setup(names, paths, schemas, opt);
+
+ testingCluster = testUtil.getTestingCluster();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ setupTestingCluster();
+
+ conf = testingCluster.getConfiguration();
+ masterContext = testingCluster.getMaster().getContext();
+
+ GlobalEngine globalEngine = masterContext.getGlobalEngine();
+ analyzer = globalEngine.getAnalyzer();
+ logicalPlanner = globalEngine.getLogicalPlanner();
+ logicalOptimizer = globalEngine.getLogicalOptimizer();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ignored) {
+ }
+
+ testUtil.shutdown();
+ }
+
+ private NonForwardQueryResultScanner getScanner(String sql) throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ String sessionId = UUID.randomUUID().toString();
+
+ return getScanner(sql, queryId, sessionId);
+ }
+
+ private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception {
+ QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+
+ Expr expr = analyzer.parse(sql);
+ LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr);
+ logicalOptimizer.optimize(logicalPlan);
+
+ int maxRow = Integer.MAX_VALUE;
+ if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) {
+ LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT);
+ maxRow = (int) limitNode.getFetchFirstNum();
+ }
+
+ NonForwardQueryResultScanner queryResultScanner =
+ new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId,
+ sessionId, maxRow);
+
+ return queryResultScanner;
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ String sessionId = UUID.randomUUID().toString();
+ NonForwardQueryResultScanner queryResultScanner =
+ getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE",
+ queryId, sessionId);
+
+ queryResultScanner.init();
+
+ assertThat(queryResultScanner.getQueryId(), is(notNullValue()));
+ assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue()));
+ assertThat(queryResultScanner.getSessionId(), is(notNullValue()));
+ assertThat(queryResultScanner.getTableDesc(), is(notNullValue()));
+
+ assertThat(queryResultScanner.getQueryId(), is(queryId));
+ assertThat(queryResultScanner.getSessionId(), is(sessionId));
+
+ assertThat(queryResultScanner.getLogicalSchema().size(), is(2));
+ assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue()));
+ }
+
+ private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) {
+ List<Tuple> tuples = new ArrayList<Tuple>(bytes.size());
+
+ for (ByteString byteString: bytes) {
+ Tuple aTuple = decoder.toTuple(byteString.toByteArray());
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) {
+ return new TypeSafeDiagnosingMatcher<Tuple>() {
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendDescriptionOf(matcher);
+ }
+
+ @Override
+ protected boolean matchesSafely(Tuple item, Description mismatchDescription) {
+ Datum datum = item.get(fieldId);
+ Object itemValue = null;
+
+ if (datum.type() == Type.TEXT) {
+ itemValue = datum.asChars();
+ } else if (datum.type() == Type.INT4) {
+ itemValue = datum.asInt4();
+ } else if (datum.type() == Type.INT8) {
+ itemValue = datum.asInt8();
+ }
+
+ if (itemValue != null && matcher.matches(itemValue)) {
+ return true;
+ }
+
+ matcher.describeMismatch(itemValue, mismatchDescription);
+ return false;
+ }
+ };
+ }
+
+ @Test
+ public void testGetNextRowsForAggregateFunction() throws Exception {
+ NonForwardQueryResultScanner queryResultScanner =
+ getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES");
+
+ queryResultScanner.init();
+
+ List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+
+ assertThat(rowBytes.size(), is(1));
+
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+ List<Tuple> tuples = getTupleList(decoder, rowBytes);
+
+ assertThat(tuples.size(), is(1));
+ assertThat(tuples, hasItem(getTupleMatcher(0, is(9L))));
+ }
+
+ @Test
+ public void testGetNextRowsForTable() throws Exception {
+ NonForwardQueryResultScanner queryResultScanner =
+ getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES");
+
+ queryResultScanner.init();
+
+ List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+
+ assertThat(rowBytes.size(), is(9));
+
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+ List<Tuple> tuples = getTupleList(decoder, rowBytes);;
+
+ assertThat(tuples.size(), is(9));
+ assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index eebee6f..9002f28 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -37,6 +37,7 @@ import org.apache.tajo.algebra.WindowSpec;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.plan.LogicalPlan.QueryBlock;
@@ -1314,7 +1315,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
private void updatePhysicalInfo(TableDesc desc) {
- if (desc.getPath() != null) {
+ if (desc.getPath() != null && desc.getMeta().getStoreType() != StoreType.SYSTEM) {
try {
Path path = new Path(desc.getPath());
FileSystem fs = path.getFileSystem(new Configuration());
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index d813432..0fbd359 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -139,6 +139,27 @@ public class PlannerUtil {
(simpleOperator && noComplexComputation && isOneQueryBlock &&
noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
}
+
+ /**
+ * Checks whether the target of this query is a virtual table or not.
+ * It will be removed after tajo storage supports catalog service access.
+ *
+ */
+ public static boolean checkIfQueryTargetIsVirtualTable(LogicalPlan plan) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ boolean hasScanNode = plan.getRootBlock().hasNode(NodeType.SCAN);
+ LogicalNode[] scanNodes = findAllNodes(rootNode, NodeType.SCAN);
+ boolean isVirtualTable = scanNodes.length > 0;
+ ScanNode scanNode = null;
+
+ for (LogicalNode node: scanNodes) {
+ scanNode = (ScanNode) node;
+ isVirtualTable &= (scanNode.getTableDesc().getMeta().getStoreType() == StoreType.SYSTEM);
+ }
+
+ return !checkIfDDLPlan(rootNode) && hasScanNode && isVirtualTable;
+ }
/**
* Checks whether the query has 'from clause' or not.