You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/09/22 02:21:47 UTC
git commit: TAJO-910: Simple query (non-forwarded query) should be
supported against partition tables.
Repository: tajo
Updated Branches:
refs/heads/master cda6c897a -> 7510f886e
TAJO-910: Simple query (non-forwarded query) should be supported against partition tables.
Closes #138
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7510f886
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7510f886
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7510f886
Branch: refs/heads/master
Commit: 7510f886e7f310dede3f9460ca40ad4bdd46e4cf
Parents: cda6c89
Author: HyoungJun Kim <ba...@babokim-MacBook-Pro.local>
Authored: Mon Sep 22 09:21:04 2014 +0900
Committer: HyoungJun Kim <ba...@babokim-MacBook-Pro.local>
Committed: Mon Sep 22 09:21:04 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/client/TajoClient.java | 117 +++++++++++--
.../org/apache/tajo/jdbc/FetchResultSet.java | 91 ++++++++++
.../org/apache/tajo/jdbc/TajoResultSetBase.java | 2 +-
tajo-client/src/main/proto/ClientProtos.proto | 29 +++-
.../main/proto/TajoMasterClientProtocol.proto | 4 +-
.../main/java/org/apache/tajo/SessionVars.java | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 5 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 32 +++-
.../apache/tajo/engine/planner/PlannerUtil.java | 138 +++++++++++++++-
.../org/apache/tajo/master/GlobalEngine.java | 31 ++--
.../master/NonForwardQueryResultScanner.java | 165 +++++++++++++++++++
.../tajo/master/TajoMasterClientService.java | 69 +++++++-
.../org/apache/tajo/master/session/Session.java | 54 ++++++
.../tajo/master/session/SessionManager.java | 13 +-
.../apache/tajo/worker/TaskAttemptContext.java | 2 -
.../org/apache/tajo/TajoTestingCluster.java | 45 +++--
.../tajo/engine/planner/TestPlannerUtil.java | 62 +++++++
.../tajo/engine/query/TestNullValues.java | 32 +++-
.../tajo/engine/query/TestSelectQuery.java | 15 ++
.../org/apache/tajo/jdbc/TestResultSet.java | 10 +-
.../apache/tajo/master/TestGlobalPlanner.java | 68 +++++++-
.../queries/TestSelectQuery/customer_ddl.sql | 9 +
.../TestSelectQuery/insert_into_customer.sql | 11 ++
...testSimpleQueryWithLimitPartitionedTable.sql | 1 +
...tSimpleQueryWithLimitPartitionedTable.result | 12 ++
.../TestTajoCli/testHelpSessionVars.result | 1 +
27 files changed, 936 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b5e4842..21bdd8c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-910: Simple query (non-forwarded query) should be supported against
+ partition tables. (Hyoungjun Kim)
+
TAJO-1035: Add default TAJO_PULLSERVER_HEAPSIZE. (Hyoungjun Kim)
TAJO-1049: Remove the parallel degree limit up to the maximum cluster
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index ab3d874..3d61cce 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.annotation.Nullable;
@@ -33,7 +34,10 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto;
import org.apache.tajo.cli.InvalidClientSessionException;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -43,12 +47,14 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.jdbc.TajoResultSet;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
@@ -63,7 +69,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
@ThreadSafe
public class TajoClient implements Closeable {
@@ -204,6 +210,25 @@ public class TajoClient implements Closeable {
}
}
+ public void closeNonForwardQuery(final QueryId queryId) {
+ NettyClientBase tmClient = null;
+ try {
+ tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
+ checkSessionAndGet(tmClient);
+
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+
+ builder.setSessionId(getSessionId());
+ builder.setQueryId(queryId.getProto());
+ tajoMasterService.closeNonForwardQuery(null, builder.build());
+ } catch (Exception e) {
+ LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
+ } finally {
+ connPool.closeConnection(tmClient);
+ }
+ }
+
private void checkSessionAndGet(NettyClientBase client) throws ServiceException {
if (sessionId == null) {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -518,23 +543,21 @@ public class TajoClient implements Closeable {
public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException {
if (response.hasTableDesc()) {
- TajoConf conf = new TajoConf(client.getConf());
- conf.setVar(ConfVars.USERNAME, response.getUserName());
- if (response.hasMaxRowNum()) {
- return new TajoResultSet(
- client,
- QueryIdFactory.NULL_QUERY_ID,
- conf,
- new TableDesc(response.getTableDesc()),
- response.getMaxRowNum());
- } else {
- return new TajoResultSet(
- client,
- QueryIdFactory.NULL_QUERY_ID,
- conf,
- new TableDesc(response.getTableDesc()));
+ // non-forward query
+ // select * from table1 [limit 10]
+ int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM);
+ if (response.hasSessionVariables()) {
+ for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
+ if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
+ fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
+ }
+ }
}
+ TableDesc tableDesc = new TableDesc(response.getTableDesc());
+ return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
} else {
+ // simple eval query
+ // select substr('abc', 1, 2)
SerializedResultSet serializedResultSet = response.getResultSet();
return new TajoMemoryResultSet(
new Schema(serializedResultSet.getSchema()),
@@ -606,6 +629,44 @@ public class TajoClient implements Closeable {
}
}
+ public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
+ try {
+ ServerCallable<SerializedResultSet> callable =
+ new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+ public SerializedResultSet call(NettyClientBase client) throws ServiceException {
+ checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+ builder.setSessionId(sessionId);
+ builder.setQueryId(queryId.getProto());
+ builder.setFetchRowNum(fetchRowNum);
+ try {
+ GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+ if (response.getResultCode() == ResultCode.ERROR) {
+ throw new ServiceException(response.getErrorTrace());
+ }
+
+ return response.getResultSet();
+ } catch (ServiceException e) {
+ abort();
+ throw e;
+ } catch (Throwable t) {
+ throw new ServiceException(t.getMessage(), t);
+ }
+ }
+ };
+
+ SerializedResultSet serializedResultSet = callable.withRetries();
+
+ return new TajoMemoryResultSet(
+ new Schema(serializedResultSet.getSchema()),
+ serializedResultSet.getSerializedTuplesList(),
+ serializedResultSet.getSerializedTuplesCount());
+ } catch (Exception e) {
+ throw new ServiceException(e.getMessage(), e);
+ }
+ }
+
public boolean updateQuery(final String sql) throws ServiceException {
return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
@@ -747,6 +808,25 @@ public class TajoClient implements Closeable {
public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
final TableMeta meta)
throws SQLException, ServiceException {
+ return createExternalTable(tableName, schema, path, meta, null);
+ }
+
+ /**
+ * Create an external table.
+ *
+ * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
+ * If the table name is not qualified, the current database in the session will be used.
+ * @param schema The schema
+ * @param path The external table location
+ * @param meta Table meta
+ * @param partitionMethodDesc Table partition description
+ * @return the created table description.
+ * @throws SQLException
+ * @throws ServiceException
+ */
+ public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+ final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+ throws SQLException, ServiceException {
return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
@@ -760,6 +840,9 @@ public class TajoClient implements Closeable {
builder.setSchema(schema.getProto());
builder.setMeta(meta.getProto());
builder.setPath(path.toUri().toString());
+ if (partitionMethodDesc != null) {
+ builder.setPartition(partitionMethodDesc.getProto());
+ }
TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
if (res.getResultCode() == ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
@@ -915,7 +998,7 @@ public class TajoClient implements Closeable {
checkSessionAndGet(tmClient);
- KillQueryRequest.Builder builder = KillQueryRequest.newBuilder();
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
builder.setSessionId(sessionId);
builder.setQueryId(queryId.getProto());
tajoMasterService.killQuery(null, builder.build());
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
new file mode 100644
index 0000000..7ebce91
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public class FetchResultSet extends TajoResultSetBase {
+ private TajoClient tajoClient;
+ private QueryId queryId;
+ private int fetchRowNum;
+ private TajoMemoryResultSet currentResultSet;
+ private boolean finished = false;
+
+ public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
+ this.tajoClient = tajoClient;
+ this.queryId = queryId;
+ this.fetchRowNum = fetchRowNum;
+ this.totalRow = Integer.MAX_VALUE;
+ this.schema = schema;
+ }
+
+ @Override
+ protected Tuple nextTuple() throws IOException {
+ if (finished) {
+ return null;
+ }
+
+ try {
+ Tuple tuple = null;
+ if (currentResultSet != null) {
+ currentResultSet.next();
+ tuple = currentResultSet.cur;
+ }
+ if (currentResultSet == null || tuple == null) {
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ currentResultSet = null;
+ }
+ currentResultSet = tajoClient.fetchNextQueryResult(queryId, fetchRowNum);
+ if (currentResultSet == null) {
+ finished = true;
+ return null;
+ }
+
+ currentResultSet.next();
+ tuple = currentResultSet.cur;
+ }
+ if (tuple == null) {
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ currentResultSet = null;
+ }
+ finished = true;
+ }
+ return tuple;
+ } catch (Throwable t) {
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ currentResultSet = null;
+ }
+ tajoClient.closeNonForwardQuery(queryId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index d189c78..78d8bde 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -764,7 +764,7 @@ public abstract class TajoResultSetBase implements ResultSet {
return true;
}
} catch (IOException e) {
- throw new SQLException(e.getMessage());
+ throw new SQLException(e.getMessage(), e);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 0359685..23ae6dd 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -86,7 +86,7 @@ message GetQueryResultResponse {
required string tajoUserName = 3;
}
-message KillQueryRequest {
+message QueryIdRequest {
optional SessionIdProto sessionId = 1;
required QueryIdProto queryId = 2;
}
@@ -115,6 +115,12 @@ message GetQueryStatusRequest {
required QueryIdProto queryId = 2;
}
+message SerializedResultSet {
+ optional SchemaProto schema = 1;
+ optional int32 bytesNum = 2;
+ repeated bytes serializedTuples = 3;
+}
+
message SubmitQueryResponse {
required ResultCode resultCode = 1;
required QueryIdProto queryId = 2;
@@ -124,18 +130,14 @@ message SubmitQueryResponse {
optional string queryMasterHost = 5;
optional int32 queryMasterPort = 6;
- message SerializedResultSet {
- optional SchemaProto schema = 1;
- optional int32 bytesNum = 2;
- repeated bytes serializedTuples = 3;
- }
-
optional SerializedResultSet resultSet = 7;
optional TableDescProto tableDesc = 8;
optional int32 maxRowNum = 9;
optional string errorMessage = 10;
optional string errorTrace = 11;
+
+ optional KeyValueSetProto sessionVariables = 12;
}
message GetQueryStatusResponse {
@@ -152,6 +154,19 @@ message GetQueryStatusResponse {
optional int32 queryMasterPort = 12;
}
+message GetQueryResultDataRequest {
+ required SessionIdProto sessionId = 1;
+ required QueryIdProto queryId = 2;
+ required int32 fetchRowNum = 3;
+}
+
+message GetQueryResultDataResponse {
+ required ResultCode resultCode = 1;
+ required SerializedResultSet resultSet = 2;
+ optional string errorMessage = 3;
+ optional string errorTrace = 4;
+}
+
message GetClusterInfoRequest {
optional SessionIdProto sessionId = 1;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
index 9495fb1..1afc069 100644
--- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
@@ -42,13 +42,15 @@ service TajoMasterClientProtocolService {
rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
+ rpc getQueryResultData(GetQueryResultDataRequest) returns (GetQueryResultDataResponse);
// Query And Resource Management APIs
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse);
- rpc killQuery(KillQueryRequest) returns (BoolProto);
+ rpc killQuery(QueryIdRequest) returns (BoolProto);
rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
+ rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto);
// Database Management APIs
rpc createDatabase(SessionedStringProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index c32fd43..cc875b2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -115,6 +115,9 @@ public enum SessionVars implements ConfigKey {
ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
"If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT),
+ // ResultSet ----------------------------------------------------------------
+ FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT),
+
//-------------------------------------------------------------------------------
// Only for Unit Testing
//-------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index a089b54..b5a9b50 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -356,7 +356,10 @@ public class TajoConf extends Configuration {
$TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
// Behavior Control ---------------------------------------------------------
- $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false);
+ $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
+
+ // ResultSet ---------------------------------------------------------
+ $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200)
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index abd9e7f..71d930f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -26,7 +26,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.storage.StorageConstants;
@@ -40,6 +43,7 @@ public class TPCH extends BenchmarkSet {
public static final String LINEITEM = "lineitem";
public static final String CUSTOMER = "customer";
+ public static final String CUSTOMER_PARTS = "customer_parts";
public static final String NATION = "nation";
public static final String PART = "part";
public static final String REGION = "region";
@@ -54,6 +58,7 @@ public class TPCH extends BenchmarkSet {
static {
tableVolumes.put(LINEITEM, 759863287L);
tableVolumes.put(CUSTOMER, 24346144L);
+ tableVolumes.put(CUSTOMER_PARTS, 707L);
tableVolumes.put(NATION, 2224L);
tableVolumes.put(PART, 24135125L);
tableVolumes.put(REGION, 389L);
@@ -98,6 +103,16 @@ public class TPCH extends BenchmarkSet {
.addColumn("c_comment", Type.TEXT); // 7
schemas.put(CUSTOMER, customer);
+ Schema customerParts = new Schema()
+ .addColumn("c_custkey", Type.INT4) // 0
+ .addColumn("c_name", Type.TEXT) // 1
+ .addColumn("c_address", Type.TEXT) // 2
+ .addColumn("c_phone", Type.TEXT) // 3
+ .addColumn("c_acctbal", Type.FLOAT8) // 4
+ .addColumn("c_mktsegment", Type.TEXT) // 5
+ .addColumn("c_comment", Type.TEXT); // 6
+ schemas.put(CUSTOMER_PARTS, customerParts);
+
Schema nation = new Schema()
.addColumn("n_nationkey", Type.INT4) // 0
.addColumn("n_name", Type.TEXT) // 1
@@ -177,6 +192,7 @@ public class TPCH extends BenchmarkSet {
public void loadTables() throws ServiceException {
loadTable(LINEITEM);
loadTable(CUSTOMER);
+ loadTable(CUSTOMER_PARTS);
loadTable(NATION);
loadTable(PART);
loadTable(REGION);
@@ -187,12 +203,24 @@ public class TPCH extends BenchmarkSet {
}
- private void loadTable(String tableName) throws ServiceException {
+ public void loadTable(String tableName) throws ServiceException {
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
meta.putOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ PartitionMethodDesc partitionMethodDesc = null;
+ if (tableName.equals(CUSTOMER_PARTS)) {
+ Schema expressionSchema = new Schema();
+ expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4);
+ partitionMethodDesc = new PartitionMethodDesc(
+ tajo.getCurrentDatabase(),
+ CUSTOMER_PARTS,
+ CatalogProtos.PartitionType.COLUMN,
+ "c_nationkey",
+ expressionSchema);
+ }
try {
- tajo.createExternalTable(tableName, getSchema(tableName), new Path(dataDir, tableName), meta);
+ tajo.createExternalTable(tableName, getSchema(tableName),
+ new Path(dataDir, tableName), meta, partitionMethodDesc);
} catch (SQLException s) {
throw new ServiceException(s);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3390758..c4cc254 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -21,21 +21,31 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.*;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.TUtil;
+import java.io.IOException;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
public class PlannerUtil {
@@ -70,24 +80,43 @@ public class PlannerUtil {
// one block, without where clause, no group-by, no-sort, no-join
boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
boolean simpleOperator = rootNode.getChild().getType() == NodeType.LIMIT
- || rootNode.getChild().getType() == NodeType.SCAN;
+ || rootNode.getChild().getType() == NodeType.SCAN || rootNode.getChild().getType() == NodeType.PARTITIONS_SCAN;
boolean noOrderBy = !plan.getRootBlock().hasNode(NodeType.SORT);
boolean noGroupBy = !plan.getRootBlock().hasNode(NodeType.GROUP_BY);
boolean noWhere = !plan.getRootBlock().hasNode(NodeType.SELECTION);
boolean noJoin = !plan.getRootBlock().hasNode(NodeType.JOIN);
- boolean singleRelation = plan.getRootBlock().hasNode(NodeType.SCAN)
- && PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
+ boolean singleRelation =
+ (plan.getRootBlock().hasNode(NodeType.SCAN) || plan.getRootBlock().hasNode(NodeType.PARTITIONS_SCAN)) &&
+ PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
boolean noComplexComputation = false;
if (singleRelation) {
ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
- if (!scanNode.getTableDesc().hasPartition() && scanNode.hasTargets()
- && scanNode.getTargets().length == scanNode.getInSchema().size()) {
+ if (scanNode == null) {
+ scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+ }
+ if (scanNode.hasTargets()) {
+ // If the number of columns in the select clause is s different from table schema,
+ // This query is not a simple query.
+ if (scanNode.getTableDesc().hasPartition()) {
+ // In the case of partitioned table, the actual number of columns is ScanNode.InSchema + partitioned columns
+ int numPartitionColumns = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema().size();
+ if (scanNode.getTargets().length != scanNode.getInSchema().size() + numPartitionColumns) {
+ return false;
+ }
+ } else {
+ if (scanNode.getTargets().length != scanNode.getInSchema().size()) {
+ return false;
+ }
+ }
noComplexComputation = true;
for (int i = 0; i < scanNode.getTargets().length; i++) {
- noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
+ noComplexComputation =
+ noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
if (noComplexComputation) {
- noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getNamedColumn().equals(scanNode.getInSchema().getColumn(i));
+ noComplexComputation = noComplexComputation &&
+ scanNode.getTargets()[i].getNamedColumn().equals(
+ scanNode.getTableDesc().getLogicalSchema().getColumn(i));
}
if (!noComplexComputation) {
return noComplexComputation;
@@ -97,7 +126,8 @@ public class PlannerUtil {
}
return !checkIfDDLPlan(rootNode) &&
- (simpleOperator && noComplexComputation && isOneQueryBlock && noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
+ (simpleOperator && noComplexComputation && isOneQueryBlock &&
+ noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
}
/**
@@ -762,4 +792,96 @@ public class PlannerUtil {
return explains.toString();
}
+
+ /**
+ * Listing table data file which is not empty.
+ * If the table is a partitioned table, return file list which has same partition key.
+ * @param tajoConf
+ * @param tableDesc
+ * @param fileIndex
+ * @param numResultFiles
+ * @return
+ * @throws IOException
+ */
+ public static FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
+ int fileIndex, int numResultFiles) throws IOException {
+ FileSystem fs = tableDesc.getPath().getFileSystem(tajoConf);
+
+ List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
+ if (fs.exists(tableDesc.getPath())) {
+ getNonZeroLengthDataFiles(fs, tableDesc.getPath(), nonZeroLengthFiles, fileIndex, numResultFiles,
+ new AtomicInteger(0));
+ }
+
+ List<FileFragment> fragments = new ArrayList<FileFragment>();
+
+ //In the case of partitioned table, return same partition key data files.
+ int numPartitionColumns = 0;
+ if (tableDesc.hasPartition()) {
+ numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+ }
+ String[] previousPartitionPathNames = null;
+ for (FileStatus eachFile: nonZeroLengthFiles) {
+ FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
+
+ if (numPartitionColumns > 0) {
+ // finding partition key;
+ Path filePath = fileFragment.getPath();
+ Path parentPath = filePath;
+ String[] parentPathNames = new String[numPartitionColumns];
+ for (int i = 0; i < numPartitionColumns; i++) {
+ parentPath = parentPath.getParent();
+ parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
+ }
+
+ // If current partitionKey == previousPartitionKey, add to result.
+ if (previousPartitionPathNames == null) {
+ fragments.add(fileFragment);
+ } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
+ fragments.add(fileFragment);
+ } else {
+ break;
+ }
+ previousPartitionPathNames = parentPathNames;
+ } else {
+ fragments.add(fileFragment);
+ }
+ }
+ return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{}));
+ }
+
+ private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
+ int startFileIndex, int numResultFiles,
+ AtomicInteger currentFileIndex) throws IOException {
+ if (fs.isDirectory(path)) {
+ FileStatus[] files = fs.listStatus(path);
+ if (files != null && files.length > 0) {
+ for (FileStatus eachFile : files) {
+ if (result.size() >= numResultFiles) {
+ return;
+ }
+ if (eachFile.isDirectory()) {
+ getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
+ currentFileIndex);
+ } else if (eachFile.isFile() && eachFile.getLen() > 0) {
+ if (currentFileIndex.get() >= startFileIndex) {
+ result.add(eachFile);
+ }
+ currentFileIndex.incrementAndGet();
+ }
+ }
+ }
+ } else {
+ FileStatus fileStatus = fs.getFileStatus(path);
+ if (fileStatus != null && fileStatus.getLen() > 0) {
+ if (currentFileIndex.get() >= startFileIndex) {
+ result.add(fileStatus);
+ }
+ currentFileIndex.incrementAndGet();
+ if (result.size() >= numResultFiles) {
+ return;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2c62d42..504a792 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -27,10 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
+import org.apache.tajo.*;
import org.apache.tajo.algebra.AlterTablespaceSetType;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
@@ -39,6 +36,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
@@ -50,6 +48,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
@@ -71,7 +70,7 @@ import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
public class GlobalEngine extends AbstractService {
/** Class Logger */
@@ -217,17 +216,27 @@ public class GlobalEngine extends AbstractService {
// Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
} else if (PlannerUtil.checkIfSimpleQuery(plan)) {
ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+ if (scanNode == null) {
+ scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+ }
TableDesc desc = scanNode.getTableDesc();
+ int maxRow = Integer.MAX_VALUE;
if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
- responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum());
- } else {
- if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) {
- responseBuilder.setMaxRowNum(Integer.MAX_VALUE);
- }
+ maxRow = (int) limitNode.getFetchFirstNum();
}
+ QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+ NonForwardQueryResultScanner queryResultScanner =
+ new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+
+ queryResultScanner.init();
+ session.addNonForwardQueryResultScanner(queryResultScanner);
+
+ responseBuilder.setQueryId(queryId.getProto());
+ responseBuilder.setMaxRowNum(maxRow);
responseBuilder.setTableDesc(desc.getProto());
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setSessionVariables(session.getProto().getVariables());
responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
// NonFromQuery indicates a form of 'select a, x+y;'
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
new file mode 100644
index 0000000..f8c51fd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.ByteString;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonForwardQueryResultScanner {
+ private static final int MAX_FILE_NUM_PER_SCAN = 100;
+
+ private QueryId queryId;
+ private String sessionId;
+ private SeqScanExec scanExec;
+ private TableDesc tableDesc;
+ private RowStoreEncoder rowEncoder;
+ private int maxRow;
+ private int currentNumRows;
+ private TaskAttemptContext taskContext;
+ private TajoConf tajoConf;
+ private ScanNode scanNode;
+
+ private int currentFileIndex = 0;
+
+ public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
+ QueryId queryId,
+ ScanNode scanNode,
+ TableDesc tableDesc,
+ int maxRow) throws IOException {
+ this.tajoConf = tajoConf;
+ this.sessionId = sessionId;
+ this.queryId = queryId;
+ this.scanNode = scanNode;
+ this.tableDesc = tableDesc;
+ this.maxRow = maxRow;
+
+ this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+ }
+
+ public void init() throws IOException {
+ initSeqScanExec();
+ }
+
+ private void initSeqScanExec() throws IOException {
+ FragmentProto[] fragments = PlannerUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
+ currentFileIndex, MAX_FILE_NUM_PER_SCAN);
+ if (fragments != null && fragments.length > 0) {
+ this.taskContext = new TaskAttemptContext(
+ new QueryContext(tajoConf), null,
+ new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
+ fragments, null);
+
+ try {
+ // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
+ scanExec = new SeqScanExec(taskContext,
+ StorageManagerFactory.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
+ } catch (CloneNotSupportedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ scanExec.init();
+ currentFileIndex += fragments.length;
+ }
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setScanExec(SeqScanExec scanExec) {
+ this.scanExec = scanExec;
+ }
+
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ public void close() throws Exception {
+ if (scanExec != null) {
+ scanExec.close();
+ scanExec = null;
+ }
+ }
+
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+ List<ByteString> rows = new ArrayList<ByteString>();
+ if (scanExec == null) {
+ return rows;
+ }
+ int rowCount = 0;
+
+ while (true) {
+ Tuple tuple = scanExec.next();
+ if (tuple == null) {
+ scanExec.close();
+ scanExec = null;
+
+ initSeqScanExec();
+ if (scanExec != null) {
+ tuple = scanExec.next();
+ }
+ if (tuple == null) {
+ if (scanExec != null ) {
+ scanExec.close();
+ scanExec = null;
+ }
+
+ break;
+ }
+ }
+ rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
+ rowCount++;
+ currentNumRows++;
+ if (rowCount >= fetchRowNum) {
+ break;
+ }
+
+ if (currentNumRows >= maxRow) {
+ scanExec.close();
+ scanExec = null;
+ break;
+ }
+ }
+
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 738b643..65bde27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -56,6 +57,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -452,6 +454,7 @@ public class TajoMasterClientService extends AbstractService {
if (queryInProgress == null) {
queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
}
+
if (queryInProgress != null) {
QueryInfo queryInfo = queryInProgress.getQueryInfo();
builder.setResultCode(ResultCode.OK);
@@ -468,14 +471,70 @@ public class TajoMasterClientService extends AbstractService {
builder.setFinishTime(System.currentTimeMillis());
}
} else {
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage("No such query: " + queryId.toString());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ if (session.getNonForwardQueryResultScanner(queryId) != null) {
+ builder.setResultCode(ResultCode.OK);
+ builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+ } else {
+ builder.setResultCode(ResultCode.ERROR);
+ builder.setErrorMessage("No such query: " + queryId.toString());
+ }
}
}
return builder.build();
} catch (Throwable t) {
- throw new ServiceException(t);
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetQueryResultDataResponse getQueryResultData(RpcController controller, GetQueryResultDataRequest request)
+ throws ServiceException {
+ GetQueryResultDataResponse.Builder builder = GetQueryResultDataResponse.newBuilder();
+
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ QueryId queryId = new QueryId(request.getQueryId());
+ NonForwardQueryResultScanner queryResultScanner = session.getNonForwardQueryResultScanner(queryId);
+ if (queryResultScanner == null) {
+ throw new ServiceException("No NonForwardQueryResultScanner for " + queryId);
+ }
+
+ List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum());
+ SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder();
+ resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto());
+ resultSetBuilder.addAllSerializedTuples(rows);
+
+ builder.setResultSet(resultSetBuilder.build());
+ builder.setResultCode(ResultCode.OK);
+
+ LOG.info("Send result to client for " +
+ request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows");
+
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ builder.setResultCode(ResultCode.ERROR);
+ String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage();
+ builder.setErrorMessage(errorMessage);
+ builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BoolProto closeNonForwardQuery(RpcController controller, QueryIdRequest request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ QueryId queryId = new QueryId(request.getQueryId());
+
+ session.closeNonForwardQueryResultScanner(queryId);
+ return BOOL_TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
}
}
@@ -483,7 +542,7 @@ public class TajoMasterClientService extends AbstractService {
* It is invoked by TajoContainerProxy.
*/
@Override
- public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+ public BoolProto killQuery(RpcController controller, QueryIdRequest request) throws ServiceException {
try {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
@@ -492,7 +551,7 @@ public class TajoMasterClientService extends AbstractService {
new QueryInfo(queryId)));
return BOOL_TRUE;
} catch (Throwable t) {
- t.printStackTrace();
+ LOG.error(t.getMessage(), t);
throw new ServiceException(t);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
index 1f21e2a..5f44ecb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -19,7 +19,11 @@
package org.apache.tajo.master.session;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.master.NonForwardQueryResultScanner;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.common.ProtoObject;
@@ -29,10 +33,13 @@ import java.util.Map;
import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
+ private static final Log LOG = LogFactory.getLog(Session.class);
+
private final String sessionId;
private final String userName;
private String currentDatabase;
private final Map<String, String> sessionVariables;
+ private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
// transient status
private volatile long lastAccessTime;
@@ -139,4 +146,51 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>, Clo
newSession.sessionVariables.putAll(getAllVariables());
return newSession;
}
+
+ public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
+ synchronized (nonForwardQueryMap) {
+ return nonForwardQueryMap.get(queryId);
+ }
+ }
+
+ public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
+ synchronized (nonForwardQueryMap) {
+ nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
+ }
+ }
+
+ public void closeNonForwardQueryResultScanner(QueryId queryId) {
+ NonForwardQueryResultScanner resultScanner;
+ synchronized (nonForwardQueryMap) {
+ resultScanner = nonForwardQueryMap.remove(queryId);
+ }
+
+ if (resultScanner != null) {
+ try {
+ resultScanner.close();
+ } catch (Exception e) {
+ LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void close() {
+ try {
+ synchronized (nonForwardQueryMap) {
+ for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
+ try {
+ eachQueryScanner.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing NonForwardQueryResultScanner: " +
+ eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
+ }
+ }
+
+ nonForwardQueryMap.clear();
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new RuntimeException(t.getMessage(), t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
index 24df9d8..d701d03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -74,12 +74,15 @@ public class SessionManager extends CompositeService implements EventHandler<Ses
return sessionId;
}
- public void removeSession(String sessionId) {
+ public Session removeSession(String sessionId) {
if (sessions.containsKey(sessionId)) {
- sessions.remove(sessionId);
LOG.info("Session " + sessionId + " is removed.");
+ Session session = sessions.remove(sessionId);
+ session.close();
+ return session;
} else {
LOG.error("No such session id: " + sessionId);
+ return null;
}
}
@@ -132,8 +135,10 @@ public class SessionManager extends CompositeService implements EventHandler<Ses
}
if (event.getType() == SessionEventType.EXPIRE) {
- Session session = sessions.remove(event.getSessionId());
- LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+ Session session = removeSession(event.getSessionId());
+ if (session != null) {
+ LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 422ec2b..96208ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -118,8 +118,6 @@ public class TaskAttemptContext {
if (workerContext != null) {
this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
} else {
- // For TestCase
- LOG.warn("WorkerContext is null, so create HashShuffleAppenderManager created per a Task.");
try {
this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index c74a4ec..aec11f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -573,6 +573,39 @@ public class TajoTestingCluster {
LOG.info("Minicluster is down");
}
+ public static TajoClient newTajoClient() throws Exception {
+ TpchTestBase instance = TpchTestBase.getInstance();
+ TajoTestingCluster util = instance.getTestingCluster();
+ while(true) {
+ if(util.getMaster().isMasterRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ TajoConf conf = util.getConfiguration();
+ return new TajoClient(conf);
+ }
+
+ public static ResultSet run(String[] names,
+ Schema[] schemas,
+ KeyValueSet tableOption,
+ String[][] tables,
+ String query,
+ TajoClient client) throws Exception {
+ TajoTestingCluster util = TpchTestBase.getInstance().getTestingCluster();
+
+ FileSystem fs = util.getDefaultFileSystem();
+ Path rootDir = util.getMaster().
+ getStorageManager().getWarehouseDir();
+ fs.mkdirs(rootDir);
+ for (int i = 0; i < names.length; i++) {
+ createTable(names[i], schemas[i], tableOption, tables[i]);
+ }
+ Thread.sleep(1000);
+ ResultSet res = client.executeQueryAndGetResult(query);
+ return res;
+ }
+
public static ResultSet run(String[] names,
Schema[] schemas,
KeyValueSet tableOption,
@@ -588,17 +621,9 @@ public class TajoTestingCluster {
}
TajoConf conf = util.getConfiguration();
TajoClient client = new TajoClient(conf);
+
try {
- FileSystem fs = util.getDefaultFileSystem();
- Path rootDir = util.getMaster().
- getStorageManager().getWarehouseDir();
- fs.mkdirs(rootDir);
- for (int i = 0; i < names.length; i++) {
- createTable(names[i], schemas[i], tableOption, tables[i]);
- }
- Thread.sleep(1000);
- ResultSet res = client.executeQueryAndGetResult(query);
- return res;
+ return run(names, schemas, tableOption, tables, query, client);
} finally {
client.close();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index b370be7..756dadc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -18,11 +18,14 @@
package org.apache.tajo.engine.planner;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.*;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -34,13 +37,18 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
@@ -318,4 +326,58 @@ public class TestPlannerUtil {
assertTrue(innerComparator.compare(t1, t2) < 0);
assertTrue(innerComparator.compare(t2, t1) > 0);
}
+
+ @Test
+ public void testGetNonZeroLengthDataFiles() throws Exception {
+ String queryFiles = ClassLoader.getSystemResource("queries").toString() + "/TestSelectQuery";
+ Path path = new Path(queryFiles);
+
+ TableDesc tableDesc = new TableDesc();
+ tableDesc.setName("Test");
+ tableDesc.setPath(path);
+
+ FileSystem fs = path.getFileSystem(util.getConfiguration());
+
+ List<Path> expectedFiles = new ArrayList<Path>();
+ RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+ if (file.isFile() && file.getLen() > 0) {
+ expectedFiles.add(file.getPath());
+ }
+ }
+ int fileNum = expectedFiles.size() / 5;
+
+ int numResultFiles = 0;
+ for (int i = 0; i <= 5; i++) {
+ int start = i * fileNum;
+
+ FragmentProto[] fragments =
+ PlannerUtil.getNonZeroLengthDataFiles(util.getConfiguration(), tableDesc, start, fileNum);
+ assertNotNull(fragments);
+
+ numResultFiles += fragments.length;
+ int expectedSize = fileNum;
+ if (i == 5) {
+ //last
+ expectedSize = expectedFiles.size() - (fileNum * 5);
+ }
+
+ comparePath(expectedFiles, fragments, start, expectedSize);
+ }
+
+ assertEquals(expectedFiles.size(), numResultFiles);
+ }
+
+ private void comparePath(List<Path> expectedFiles, FragmentProto[] fragments,
+ int startIndex, int expectedSize) throws Exception {
+ assertEquals(expectedSize, fragments.length);
+
+ int index = 0;
+
+ for (int i = startIndex; i < startIndex + expectedSize; i++, index++) {
+ FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]);
+ assertEquals(expectedFiles.get(i), fragment.getPath());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
index de75ca7..a492d6e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.query;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
@@ -165,9 +166,11 @@ public class TestNullValues {
@Test
public final void testResultSetNullSimpleQuery() throws Exception {
String tableName = "nulltable5";
- ResultSet res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName);
+ ResultSet res = null;
+ TajoClient client = TajoTestingCluster.newTajoClient();
try {
+ res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client);
int numRows = 0;
String expected =
@@ -193,7 +196,11 @@ public class TestNullValues {
assertEquals(4, numRows);
assertEquals(expected, result);
} finally {
- res.close();
+ if (res != null) {
+ res.close();
+ }
+
+ client.close();
}
}
@@ -207,9 +214,11 @@ public class TestNullValues {
"col4 " +
"from " + tableName;
- ResultSet res = runNullTableQuery(tableName, query);
+ TajoClient client = TajoTestingCluster.newTajoClient();
+ ResultSet res = null;
try {
+ res = runNullTableQuery(tableName, query, client);
int numRows = 0;
String expected =
"null|99999|a|a|1.0|1.0|true\n" +
@@ -234,11 +243,15 @@ public class TestNullValues {
assertEquals(4, numRows);
assertEquals(expected, result);
} finally {
- res.close();
+ if (res != null) {
+ res.close();
+ }
+
+ client.close();
}
}
- private ResultSet runNullTableQuery(String tableName, String query) throws Exception {
+ private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception {
String [] table = new String[] {tableName};
Schema schema = new Schema();
schema.addColumn("col1", Type.INT4);
@@ -256,10 +269,11 @@ public class TestNullValues {
tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
- ResultSet res = TajoTestingCluster
- .run(table, schemas, tableOptions, new String[][]{data}, query);
-
- return res;
+ if (client == null) {
+ return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query);
+ } else {
+ return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client);
+ }
}
private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index 17ce37b..41e2f3e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -81,6 +81,21 @@ public class TestSelectQuery extends QueryTestCaseBase {
}
@Test
+ public final void testSimpleQueryWithLimitPartitionedTable() throws Exception {
+ // select * from customer_parts limit 10;
+ executeDDL("customer_ddl.sql", null);
+ for (int i = 0; i < 5; i++) {
+ executeFile("insert_into_customer.sql").close();
+ }
+
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+
+ executeString("DROP TABLE customer_parts PURGE").close();
+ }
+
+ @Test
public final void testExplainSelect() throws Exception {
// explain select l_orderkey, l_partkey from lineitem;
ResultSet res = executeQuery();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 38908db..2c06b88 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.client.TajoClient;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
@@ -139,6 +140,7 @@ public class TestResultSet {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
ResultSet res = null;
+ TajoClient client = TajoTestingCluster.newTajoClient();
try {
String tableName = "datetimetable";
String query = "select col1, col2, col3 from " + tableName;
@@ -156,7 +158,7 @@ public class TestResultSet {
tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
res = TajoTestingCluster
- .run(table, schemas, tableOptions, new String[][]{data}, query);
+ .run(table, schemas, tableOptions, new String[][]{data}, query, client);
assertTrue(res.next());
@@ -212,7 +214,11 @@ public class TestResultSet {
} finally {
TajoConf.setCurrentTimeZone(tajoCurrentTimeZone);
TimeZone.setDefault(systemCurrentTimeZone);
- res.close();
+ if (res != null) {
+ res.close();
+ }
+
+ client.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index b6ac551..a92b751 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@ -32,10 +33,7 @@ import org.apache.tajo.engine.eval.BinaryEval;
import org.apache.tajo.engine.eval.EvalType;
import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -55,6 +53,8 @@ import java.util.Map;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class TestGlobalPlanner {
private static Log LOG = LogFactory.getLog(TestGlobalPlanner.class);
@@ -80,10 +80,10 @@ public class TestGlobalPlanner {
// TPC-H Schema for Complex Queries
String [] tables = {
- "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer"
+ "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer", "customer_parts"
};
int [] volumes = {
- 100, 200, 50, 5, 5, 800, 300, 100
+ 100, 200, 50, 5, 5, 800, 300, 100, 707
};
tpch = new TPCH();
tpch.loadSchemas();
@@ -96,6 +96,19 @@ public class TestGlobalPlanner {
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, tables[i]), tpch.getSchema(tables[i]), m,
CommonTestingUtil.getTestDir());
d.setStats(stats);
+
+ if (tables[i].equals(TPCH.CUSTOMER_PARTS)) {
+ Schema expressionSchema = new Schema();
+ expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4);
+ PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc(
+ DEFAULT_DATABASE_NAME,
+ tables[i],
+ CatalogProtos.PartitionType.COLUMN,
+ "c_nationkey",
+ expressionSchema);
+
+ d.setPartitionMethod(partitionMethodDesc);
+ }
catalog.createTable(d);
}
@@ -174,10 +187,10 @@ public class TestGlobalPlanner {
visitChildExecutionBLock(plan, root, evalMap);
// Find required shuffleKey.
- Assert.assertTrue(evalMap.get(eval1).booleanValue());
+ assertTrue(evalMap.get(eval1).booleanValue());
// Find that ShuffleKeys only includes equi-join conditions
- Assert.assertFalse(evalMap.get(eval2).booleanValue());
+ assertFalse(evalMap.get(eval2).booleanValue());
}
private void visitChildExecutionBLock(MasterPlan plan, ExecutionBlock parentBlock,
@@ -290,4 +303,43 @@ public class TestGlobalPlanner {
public void testTPCH_Q5() throws Exception {
buildPlan(FileUtil.readTextFile(new File("benchmark/tpch/q5.sql")));
}
+
+ @Test
+ public void testCheckIfSimpleQuery() throws Exception {
+ MasterPlan plan = buildPlan("select * from customer");
+ assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ //partition table
+ plan = buildPlan("select * from customer_parts");
+ assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ plan = buildPlan("select * from customer where c_nationkey = 1");
+ assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ plan = buildPlan("select * from customer_parts where c_nationkey = 1");
+ assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ // same column order
+ plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" +
+ " from customer");
+ assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ plan = buildPlan("select c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+ " from customer_parts");
+ assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ // different column order
+ plan = buildPlan("select c_name, c_custkey, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" +
+ " from customer");
+ assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ plan = buildPlan("select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+ " from customer_parts");
+ assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+ plan = buildPlan("insert into customer_parts " +
+ " select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+ " from customer");
+ assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
new file mode 100644
index 0000000..ca43710
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
@@ -0,0 +1,9 @@
+CREATE TABLE customer_parts (
+ c_custkey INT4,
+ c_name TEXT,
+ c_address TEXT,
+ c_phone TEXT,
+ c_acctbal FLOAT8,
+ c_mktsegment TEXT,
+ c_comment TEXT
+) PARTITION BY COLUMN (c_nationkey INT4);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
new file mode 100644
index 0000000..8767ba4
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
@@ -0,0 +1,11 @@
+INSERT INTO customer_parts
+ SELECT
+ c_custkey,
+ c_name,
+ c_address,
+ c_phone,
+ c_acctbal,
+ c_mktsegment,
+ c_comment,
+ c_nationkey
+ FROM customer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
new file mode 100644
index 0000000..42362b6
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
@@ -0,0 +1 @@
+select * from customer_parts limit 10;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
new file mode 100644
index 0000000..5704ccb
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
@@ -0,0 +1,12 @@
+c_custkey,c_name,c_address,c_phone,c_acctbal,c_mktsegment,c_comment,c_nationkey
+-------------------------------
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 0b5e28f..e6b12b1 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -34,4 +34,5 @@ Available Session Variables:
\set NULL_CHAR [text value] - null char of text file output
\set CODEGEN [true or false] - Runtime code generation enabled (experiment)
\set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
+\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file