You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:00 UTC
[2/8] TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
new file mode 100644
index 0000000..2c5c2b6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "ClientProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+enum ResultCode {
+ OK = 0;
+ ERROR = 1;
+}
+
+message UpdateSessionVariableRequest {
+ optional SessionIdProto sessionId = 1;
+ repeated KeyValueProto setVariables = 2;
+ repeated string unsetVariables = 3;
+}
+
+message QueryRequest {
+ optional SessionIdProto sessionId = 1;
+ required string query = 2;
+ repeated KeyValueProto setVariables = 3;
+}
+
+message UpdateQueryResponse {
+ required ResultCode resultCode = 1;
+ optional string errorMessage = 2;
+}
+
+message SubmitQueryResponse {
+ required ResultCode resultCode = 1;
+ optional ApplicationAttemptIdProto queryId = 2;
+ optional string errorMessage = 3;
+}
+
+message GetQueryResultRequest {
+ optional SessionIdProto sessionId = 1;
+ required ApplicationAttemptIdProto queryId = 2;
+}
+
+message GetQueryResultResponse {
+ optional TableDescProto tableDesc = 1;
+ optional string errorMessage = 2;
+}
+
+message GetQueryListRequest {
+ optional SessionIdProto sessionId = 1;
+}
+
+message BriefQueryStatus {
+ required ApplicationAttemptIdProto queryId = 1;
+ required QueryState state = 2;
+ required int32 executionTime = 3;
+}
+
+message GetQueryListResponse {
+ repeated BriefQueryStatus queryList = 1;
+}
+
+message GetQueryStatusRequest {
+ optional SessionIdProto sessionId = 1;
+ required ApplicationAttemptIdProto queryId = 2;
+}
+
+message GetQueryStatusResponse {
+ required ResultCode resultCode = 1;
+ required ApplicationAttemptIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional float progress = 4;
+ optional int64 submitTime = 5;
+ optional int64 initTime = 6;
+ optional int64 finishTime = 7;
+ optional bool hasResult = 8;
+ optional string errorMessage = 9;
+ optional string queryMasterHost = 10;
+ optional int32 queryMasterPort = 11;
+}
+
+message GetClusterInfoRequest {
+ optional SessionIdProto sessionId = 1;
+}
+
+message GetClusterInfoResponse {
+ repeated string serverName = 1;
+}
+
+message GetTableListRequest {
+ optional SessionIdProto sessionId = 1;
+}
+
+message GetTableListResponse {
+ repeated string tables = 1;
+}
+
+message GetTableDescRequest {
+ optional SessionIdProto sessionId = 1;
+ required string tableName = 2;
+}
+
+message CreateTableRequest {
+ required string name = 1;
+ required string path = 2;
+ required TableProto meta = 3;
+}
+
+message AttachTableRequest {
+ required string name = 1;
+ required string path = 2;
+}
+
+message TableResponse {
+ optional TableDescProto tableDesc = 1;
+ optional string errorMessage = 2;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
deleted file mode 100644
index 1ddf24a..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "MasterWorkerProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "MasterWorkerProtos.proto";
-import "PrimitiveProtos.proto";
-import "TajoIdProtos.proto";
-import "tajo_protos.proto";
-import "yarn_protos.proto";
-
-service MasterWorkerProtocolService {
- rpc getTask(ContainerIdProto) returns (QueryUnitRequestProto);
- rpc statusUpdate (TaskStatusProto) returns (BoolProto);
- rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
- rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
- rpc done (TaskCompletionReport) returns (BoolProto);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
deleted file mode 100644
index ffffdcc..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.engine";
-option java_outer_classname = "MasterWorkerProtos";
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-import "TajoIdProtos.proto";
-import "tajo_protos.proto";
-
-message TaskStatusProto {
- required QueryUnitAttemptIdProto id = 1;
- required string workerName = 2;
- required float progress = 3;
- required TaskAttemptState state = 4;
- optional StatSetProto stats = 5;
- optional TableStatProto resultStats = 6;
- repeated Partition partitions = 7;
-}
-
-message TaskCompletionReport {
- required QueryUnitAttemptIdProto id = 1;
- optional StatSetProto stats = 2;
- optional TableStatProto resultStats = 3;
- repeated Partition partitions = 4;
-}
-
-message TaskFatalErrorReport {
- required QueryUnitAttemptIdProto id = 1;
- optional string error_message = 2;
-}
-
-message QueryUnitRequestProto {
- required QueryUnitAttemptIdProto id = 1;
- repeated FragmentProto fragments = 2;
- required string outputTable = 3;
- required bool clusteredOutput = 4;
- required string serializedData = 5;
- optional bool interQuery = 6 [default = false];
- repeated Fetch fetches = 7;
- optional bool shouldDie = 8;
-}
-
-message Fetch {
- required string name = 1;
- required string urls = 2;
-}
-
-message QueryUnitResponseProto {
- required string id = 1;
- required QueryState status = 2;
-}
-
-message StatusReportProto {
- required int64 timestamp = 1;
- required string serverName = 2;
- repeated TaskStatusProto status = 3;
- repeated QueryUnitAttemptIdProto pings = 4;
-}
-
-message CommandRequestProto {
- repeated Command command = 1;
-}
-
-message CommandResponseProto {
-}
-
-message Command {
- required QueryUnitAttemptIdProto id = 1;
- required CommandType type = 2;
-}
-
-enum CommandType {
- PREPARE = 0;
- LAUNCH = 1;
- STOP = 2;
- FINALIZE = 3;
-}
-
-message Partition {
- required int32 partitionKey = 1;
- optional string fileName = 2;
-}
-
-message ServerStatusProto {
- message System {
- required int32 availableProcessors = 1;
- required int64 freeMemory = 2;
- required int64 maxMemory = 3;
- required int64 totalMemory = 4;
- }
- message Disk {
- required string absolutePath = 1;
- required int64 totalSpace = 2;
- required int64 freeSpace = 3;
- required int64 usableSpace = 4;
- }
- required System system = 1;
- repeated Disk disk = 2;
- required int32 taskNum = 3;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
new file mode 100644
index 0000000..9337078
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterClientProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ClientProtos.proto";
+
+service QueryMasterClientProtocolService {
+ rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
+ rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
+ rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
+ rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
new file mode 100644
index 0000000..08fc5c9
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterManagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message QueryHeartbeat {
+ required ApplicationAttemptIdProto queryId = 1;
+ required string queryMasterHost = 2;
+ required int32 queryMasterPort = 3;
+ required int32 queryMasterClientPort = 4;
+ required QueryState state = 5;
+ optional string statusMessage = 6;
+}
+
+message QueryHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ optional ResponseCommand responseCommand = 3;
+}
+
+service QueryMasterManagerProtocolService {
+ rpc queryHeartbeat(QueryHeartbeat) returns (QueryHeartbeatResponse);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
new file mode 100644
index 0000000..b6a0602
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message TaskStatusProto {
+ required QueryUnitAttemptIdProto id = 1;
+ required string workerName = 2;
+ required float progress = 3;
+ required TaskAttemptState state = 4;
+ optional StatSetProto stats = 5;
+ optional TableStatProto resultStats = 6;
+ repeated Partition partitions = 7;
+}
+
+message TaskCompletionReport {
+ required QueryUnitAttemptIdProto id = 1;
+ optional StatSetProto stats = 2;
+ optional TableStatProto resultStats = 3;
+ repeated Partition partitions = 4;
+}
+
+message TaskFatalErrorReport {
+ required QueryUnitAttemptIdProto id = 1;
+ optional string error_message = 2;
+}
+
+message QueryUnitRequestProto {
+ required QueryUnitAttemptIdProto id = 1;
+ repeated FragmentProto fragments = 2;
+ required string outputTable = 3;
+ required bool clusteredOutput = 4;
+ required string serializedData = 5;
+ optional bool interQuery = 6 [default = false];
+ repeated Fetch fetches = 7;
+ optional bool shouldDie = 8;
+}
+
+message Fetch {
+ required string name = 1;
+ required string urls = 2;
+}
+
+message QueryUnitResponseProto {
+ required string id = 1;
+ required QueryState status = 2;
+}
+
+message StatusReportProto {
+ required int64 timestamp = 1;
+ required string serverName = 2;
+ repeated TaskStatusProto status = 3;
+ repeated QueryUnitAttemptIdProto pings = 4;
+}
+
+message CommandRequestProto {
+ repeated Command command = 1;
+}
+
+message CommandResponseProto {
+}
+
+message Command {
+ required QueryUnitAttemptIdProto id = 1;
+ required CommandType type = 2;
+}
+
+enum CommandType {
+ PREPARE = 0;
+ LAUNCH = 1;
+ STOP = 2;
+ FINALIZE = 3;
+}
+
+message Partition {
+ required int32 partitionKey = 1;
+ optional string fileName = 2;
+}
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int64 freeMemory = 2;
+ required int64 maxMemory = 3;
+ required int64 totalMemory = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+ required System system = 1;
+ repeated Disk disk = 2;
+ required int32 taskNum = 3;
+}
+
+service QueryMasterProtocolService {
+ //from Worker
+ rpc getTask(ContainerIdProto) returns (QueryUnitRequestProto);
+ rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+ rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+ rpc done (TaskCompletionReport) returns (BoolProto);
+
+ //from QueryMasterManager
+ rpc executeQuery(StringProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
new file mode 100644
index 0000000..ef7e711
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoMasterClientProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ClientProtos.proto";
+
+service TajoMasterClientProtocolService {
+ rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
+ rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
+ rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
+ rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
+ rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
+ rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
+ rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+ rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
+ rpc existTable(StringProto) returns (BoolProto);
+ rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
+ rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
+ rpc createTable(CreateTableRequest) returns (TableResponse);
+ rpc dropTable(StringProto) returns (BoolProto);
+ rpc attachTable(AttachTableRequest) returns (TableResponse);
+ rpc detachTable(StringProto) returns (BoolProto);
+
+
+ // TODO - to be implemented
+ //
+ // authenticate
+ //
+ // getSessionVariableList
+ // dropTable
+ // detachTable
+ // createIndex
+ // dropIndex
+ // registerUDF
+ // dropUDF
+ // listUdfs
+ // getUDFDesc
+ // registerJars
+ // getListRegisteredJars
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
index 3d22ff5..d337315 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
@@ -22,13 +22,16 @@ option java_generic_services = false;
option java_generate_equals_and_hash = true;
enum QueryState {
- QUERY_NEW = 0;
- QUERY_INIT = 1;
- QUERY_RUNNING = 2;
- QUERY_SUCCEEDED = 3;
- QUERY_FAILED = 4;
- QUERY_KILLED = 5;
- QUERY_ERROR = 6;
+ QUERY_MASTER_INIT = 0;
+ QUERY_MASTER_LAUNCHED = 1;
+ QUERY_NEW = 2;
+ QUERY_INIT = 3;
+ QUERY_RUNNING = 4;
+ QUERY_SUCCEEDED = 5;
+ QUERY_FAILED = 6;
+ QUERY_KILLED = 7;
+ QUERY_ERROR = 8;
+ QUERY_NOT_ASSIGNED = 9;
}
enum TaskAttemptState {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index c003637..f3da922 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -35,4 +35,22 @@
<name>tajo.task.localdir</name>
<value>/tmp/tajo-localdir</value>
</property>
+
+ <property>
+ <name>tajo.master.clientservice.addr</name>
+ <value>127.0.0.1:9004</value>
+ </property>
+
+ <property>
+ <name>tajo.master.querymastermanager.addr</name>
+ <value>127.0.0.1:9005</value>
+ </property>
+
+ <property>
+ <name>tajo.query.session.timeout</name>
+ <value>60000</value>
+ <description>ms</description>
+ </property>
+
+
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index b22e893..e629623 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -129,7 +129,7 @@ public class BackendTestingUtil {
PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
- return new ResultSetImpl(conf, new Path(workDir, "out"));
+ return new ResultSetImpl(null, null, conf, new Path(workDir, "out"));
}
public static Path createTmpTestDir() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 6425192..5e8d11d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -19,6 +19,8 @@
package org.apache.tajo;
import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
@@ -30,6 +32,8 @@ import java.io.IOException;
import java.sql.ResultSet;
public class LocalTajoTestingUtility {
+ private static final Log LOG = LogFactory.getLog(LocalTajoTestingUtility.class);
+
private TajoTestingCluster util;
private TajoConf conf;
private TajoClient client;
@@ -38,6 +42,9 @@ public class LocalTajoTestingUtility {
String[] tablepaths,
Schema[] schemas,
Options option) throws Exception {
+ LOG.info("===================================================");
+ LOG.info("Starting Test Cluster.");
+ LOG.info("===================================================");
util = new TajoTestingCluster();
util.startMiniCluster(1);
@@ -60,6 +67,11 @@ public class LocalTajoTestingUtility {
CatalogProtos.StoreType.CSV, option);
client.createTable(names[i], tablePath, meta);
}
+
+ LOG.info("===================================================");
+ LOG.info("Test Cluster ready and test table created.");
+ LOG.info("===================================================");
+
}
public TajoTestingCluster getTestingCluster() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 8de0f0b..5b7267f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -108,6 +108,11 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
+
+ // Disable virtual memory constraints for containers
+ conf.setBoolean("yarn.nodemanager.vmem-check-enabled", false);
+
super.init(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 4911b48..fab2727 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tajo.catalog.*;
@@ -154,7 +155,7 @@ public class TajoTestingCluster {
System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
this.clusterTestBuildDir.toString());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
builder.hosts(hosts);
builder.numDataNodes(servers);
builder.format(true);
@@ -230,6 +231,8 @@ public class TajoTestingCluster {
TajoConf c = getConfiguration();
c.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, "localhost:0");
c.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, "localhost:0");
+
c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
c.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
@@ -392,13 +395,19 @@ public class TajoTestingCluster {
}
public void shutdownMiniCluster() throws IOException {
- LOG.info("Shutting down minicluster");
+ LOG.info("========================================");
+ LOG.info("Shutdown minicluster");
+ LOG.info("========================================");
shutdownMiniTajoCluster();
if(this.catalogServer != null) {
shutdownCatalogCluster();
}
+ if(this.yarnCluster != null) {
+ this.yarnCluster.stop();
+ }
+
if(this.dfsCluster != null) {
this.dfsCluster.shutdown();
}
@@ -442,38 +451,6 @@ public class TajoTestingCluster {
}
public static ResultSet run(String[] names,
- String[] tablepaths,
- Schema[] schemas,
- Options option,
- String query) throws Exception {
- TajoTestingCluster util = new TajoTestingCluster();
- util.startMiniCluster(1);
- TajoConf conf = util.getConfiguration();
- TajoClient client = new TajoClient(conf);
-
- FileSystem fs = util.getDefaultFileSystem();
- Path rootDir = util.getMaster().
- getStorageManager().getBaseDir();
- fs.mkdirs(rootDir);
- for (int i = 0; i < tablepaths.length; i++) {
- Path localPath = new Path(tablepaths[i]);
- Path tablePath = new Path(rootDir, names[i]);
- fs.mkdirs(tablePath);
- Path dataPath = new Path(tablePath, "data");
- fs.mkdirs(dataPath);
- Path dfsPath = new Path(dataPath, localPath.getName());
- fs.copyFromLocalFile(localPath, dfsPath);
- TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
- CatalogProtos.StoreType.CSV, option);
- client.createTable(names[i], tablePath, meta);
- }
- Thread.sleep(1000);
- ResultSet res = client.executeQueryAndGetResult(query);
- util.shutdownMiniCluster();
- return res;
- }
-
- public static ResultSet run(String[] names,
Schema[] schemas,
Options option,
String[][] tables,
@@ -554,23 +531,5 @@ public class TajoTestingCluster {
TajoTestingCluster cluster2 = new TajoTestingCluster();
File f2 = cluster2.setupClusterTestBuildDir();
System.out.println("first setupClusterTestBuildDir of cluster2: " + f2);
- /*
- String [] names = {"table1"};
- String [][] tables = new String[1][];
- tables[0] = new String[] {"a,b,c", "b,c,d"};
-
- Schema [] schemas = new Schema[1];
- schemas[0] = new Schema()
- .addColumn("f1", CatalogProtos.DataType.STRING)
- .addColumn("f2", CatalogProtos.DataType.STRING)
- .addColumn("f3", CatalogProtos.DataType.STRING);
-
- ResultSet res = runInLocal(names, schemas, tables, "select f1 from table1");
- res.next();
- System.out.println(res.getString(0));
- res.next();
- System.out.println(res.getString(0));
- System.exit(0);
- */
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
index bc3b91d..c761103 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -51,7 +51,7 @@ public class TpchTestBase {
testBase.setUp();
Runtime.getRuntime().addShutdownHook(new ShutdownHook());
} catch (Exception e) {
- LOG.error(e);
+ LOG.error(e.getMessage(), e);
}
}
@@ -75,6 +75,9 @@ public class TpchTestBase {
File file;
for (int i = 0; i < names.length; i++) {
file = new File("src/test/tpch/" + names[i] + ".tbl");
+ if(!file.exists()) {
+ file = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/src/test/tpch/" + names[i] + ".tbl");
+ }
tables[i] = FileUtil.readTextFile(file).split("\n");
paths[i] = file.getAbsolutePath();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/benchmark/TestTPCH.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
index b47c635..b61dc46 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
@@ -53,15 +53,20 @@ public class TestTPCH {
ResultSet res = tpch.execute("select l_returnflag, l_linestatus, count(*) as count_order from lineitem " +
"group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus");
- Map<String,Integer> result = Maps.newHashMap();
- result.put("NO", 3);
- result.put("RF", 2);
+ try {
+ Map<String,Integer> result = Maps.newHashMap();
+ result.put("NO", 3);
+ result.put("RF", 2);
- res.next();
- assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
- res.next();
- assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
- assertFalse(res.next());
+ assertNotNull(res);
+ assertTrue(res.next());
+ assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+ assertTrue(res.next());
+ assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -74,12 +79,16 @@ public class TestTPCH {
"join partsupp on s_suppkey = ps_suppkey " +
"join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15");
- assertTrue(res.next());
- assertEquals("AMERICA", res.getString(10));
- String [] pType = res.getString(11).split(" ");
- assertEquals("BRASS", pType[pType.length - 1]);
- assertEquals(15, res.getInt(12));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals("AMERICA", res.getString(10));
+ String [] pType = res.getString(11).split(" ");
+ assertEquals("BRASS", pType[pType.length - 1]);
+ assertEquals(15, res.getInt(12));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -88,7 +97,11 @@ public class TestTPCH {
"case when p_type like 'PROMO%' then l_extendedprice else 0.0 end) / sum(l_extendedprice * (1 - l_discount)) "
+ "as promo_revenue from lineitem, part where l_partkey = p_partkey");
- res.next();
- assertEquals(33, res.getInt(1));
+ try {
+ assertTrue(res.next());
+ assertEquals(33, res.getInt(1));
+ } finally {
+ res.close();
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 67abc95..9f50fe3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -62,7 +62,9 @@ public class TestTajoClient {
@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
- tajo.close();
+ if(tajo != null) {
+ tajo.close();
+ }
}
private static Path writeTmpTable(String tableName) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
index 79c21ee..6f09fa6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
@@ -49,22 +49,34 @@ public class TestBuiltinFunctions {
@Test
public void testMaxLong() throws Exception {
ResultSet res = tpch.execute("select max(l_orderkey) as max from lineitem");
- res.next();
- assertEquals(3, res.getInt(1));
+ try {
+ res.next();
+ assertEquals(3, res.getInt(1));
+ } finally {
+ res.close();
+ }
}
@Test
public void testMinLong() throws Exception {
ResultSet res = tpch.execute("select min(l_orderkey) as max from lineitem");
- res.next();
- assertEquals(1, res.getInt(1));
+ try {
+ res.next();
+ assertEquals(1, res.getInt(1));
+ } finally {
+ res.close();
+ }
}
@Test
public void testCount() throws Exception {
ResultSet res = tpch.execute("select count(*) as rownum from lineitem");
- res.next();
- assertEquals(5, res.getInt(1));
+ try {
+ res.next();
+ assertEquals(5, res.getInt(1));
+ } finally {
+ res.close();
+ }
}
@Test
@@ -76,32 +88,48 @@ public class TestBuiltinFunctions {
ResultSet res = tpch.execute("select l_orderkey, avg(l_discount) as revenue from lineitem group by l_orderkey");
- while(res.next()) {
- assertTrue(result.get(res.getLong(1)) == res.getFloat(2));
+ try {
+ while(res.next()) {
+ assertTrue(result.get(res.getLong(1)) == res.getFloat(2));
+ }
+ } finally {
+ res.close();
}
}
@Test
public void testAvgLong() throws Exception {
ResultSet res = tpch.execute("select avg(l_orderkey) as avg from lineitem");
- res.next();
- assertEquals(2, res.getLong(1));
+ try {
+ res.next();
+ assertEquals(2, res.getLong(1));
+ } finally {
+ res.close();
+ }
}
@Test
public void testAvgInt() throws Exception {
ResultSet res = tpch.execute("select avg(l_partkey) as avg from lineitem");
- res.next();
- System.out.println(res.getFloat(1));
- assertTrue(1.8f == res.getFloat(1));
+ try {
+ res.next();
+ System.out.println(res.getFloat(1));
+ assertTrue(1.8f == res.getFloat(1));
+ } finally {
+ res.close();
+ }
}
@Test
public void testRandom() throws Exception {
ResultSet res = tpch.execute("select l_orderkey, random(3) as rndnum from lineitem group by l_orderkey, rndnum");
- while(res.next()) {
- assertTrue(res.getInt(2) >= 0 && res.getInt(2) < 3);
+ try {
+ while(res.next()) {
+ assertTrue(res.getInt(2) >= 0 && res.getInt(2) < 3);
+ }
+ } finally {
+ res.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index aa2f77c..cf0d70a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -100,7 +100,7 @@ public class TestGlobalQueryPlanner {
dispatcher.init(conf);
dispatcher.start();
- planner = new GlobalPlanner(conf, catalog, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, new StorageManager(conf),
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index c2e232b..5c303c0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -91,7 +91,7 @@ public class TestGlobalQueryOptimizer {
AsyncDispatcher dispatcher = new AsyncDispatcher();
- planner = new GlobalPlanner(conf, catalog, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, new StorageManager(conf),
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
@@ -132,7 +132,7 @@ public class TestGlobalQueryOptimizer {
queryId = QueryIdFactory.newQueryId();
optimizer = new GlobalOptimizer();
}
-
+
@AfterClass
public static void terminate() throws IOException {
util.shutdownCatalogCluster();
@@ -147,7 +147,7 @@ public class TestGlobalQueryOptimizer {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
globalPlan = optimizer.optimize(globalPlan);
-
+
ExecutionBlock unit = globalPlan.getRoot();
StoreTableNode store = unit.getStoreTableNode();
assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
@@ -156,14 +156,14 @@ public class TestGlobalQueryOptimizer {
SortNode sort = (SortNode) proj.getSubNode();
assertEquals(ExprType.SCAN, sort.getSubNode().getType());
ScanNode scan = (ScanNode) sort.getSubNode();
-
+
assertTrue(unit.hasChildBlock());
unit = unit.getChildBlock(scan);
store = unit.getStoreTableNode();
assertEquals(ExprType.SORT, store.getSubNode().getType());
sort = (SortNode) store.getSubNode();
assertEquals(ExprType.JOIN, sort.getSubNode().getType());
-
+
assertTrue(unit.hasChildBlock());
for (ScanNode prevscan : unit.getScanNodes()) {
ExecutionBlock prev = unit.getChildBlock(prevscan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 61a9afe..cc497a8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -50,37 +50,54 @@ public class TestGroupByQuery {
public final void testComplexParameter() throws Exception {
ResultSet res = tpch.execute(
"select sum(l_extendedprice*l_discount) as revenue from lineitem");
- assertTrue(res.next());
- assertTrue(12908 == (int) res.getDouble("revenue"));
- assertFalse(res.next());
+ try {
+ assertNotNull(res);
+ assertTrue(res.next());
+ assertTrue(12908 == (int) res.getDouble("revenue"));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
public final void testComplexParameter2() throws Exception {
ResultSet res = tpch.execute("select count(*) + max(l_orderkey) as merged from lineitem");
- res.next();
- assertEquals(8, res.getLong("merged"));
+ try {
+ assertTrue(res.next());
+ assertEquals(8, res.getLong("merged"));
+ } finally {
+ res.close();
+ }
}
//@Test
public final void testCube() throws Exception {
ResultSet res = tpch.execute(
"cube_test := select l_orderkey, l_partkey, sum(l_quantity) from lineitem group by cube(l_orderkey, l_partkey)");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ assertEquals(11, count);
+ } finally {
+ res.close();
}
- assertEquals(11, count);
}
//@Test
// TODO - to fix the limit processing and then enable it
public final void testGroupByLimit() throws Exception {
ResultSet res = tpch.execute("select l_orderkey from lineitem limit 2");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ assertEquals(2, count);
+ } finally {
+ res.close();
}
- assertEquals(2, count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index e11666c..233e9ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -56,24 +56,32 @@ public class TestJoinQuery {
public final void testCrossJoin() throws Exception {
ResultSet res = tpch.execute("select n_name, r_name, n_regionkey, r_regionkey from nation, region");
- int cnt = 0;
- while(res.next()) {
- cnt++;
+ try {
+ int cnt = 0;
+ while(res.next()) {
+ cnt++;
+ }
+ // TODO - to check their joined contents
+ assertEquals(25 * 5, cnt);
+ } finally {
+ res.close();
}
- // TODO - to check their joined contents
- assertEquals(25 * 5, cnt);
}
@Test
public final void testCrossJoinWithExplicitJoinQual() throws Exception {
ResultSet res = tpch.execute(
"select n_name, r_name, n_regionkey, r_regionkey from nation, region where n_regionkey = r_regionkey");
- int cnt = 0;
- while(res.next()) {
- cnt++;
+ try {
+ int cnt = 0;
+ while(res.next()) {
+ cnt++;
+ }
+ // TODO - to check their joined contents
+ assertEquals(25, cnt);
+ } finally {
+ res.close();
}
- // TODO - to check their joined contents
- assertEquals(25, cnt);
}
@Test
@@ -81,49 +89,57 @@ public class TestJoinQuery {
ResultSet res = tpch.execute(FileUtil
.readTextFile(new File("src/test/queries/tpch_q2_simplified.tql")));
- Object [][] result = new Object[3][3];
-
- int tupleId = 0;
- int colId = 0;
- result[tupleId][colId++] = 4032.68f;
- result[tupleId][colId++] = "Supplier#000000002";
- result[tupleId++][colId] = "ETHIOPIA";
-
- colId = 0;
- result[tupleId][colId++] = 4641.08f;
- result[tupleId][colId++] = "Supplier#000000004";
- result[tupleId++][colId] = "MOROCCO";
-
- colId = 0;
- result[tupleId][colId++] = 4192.4f;
- result[tupleId][colId++] = "Supplier#000000003";
- result[tupleId][colId] = "ARGENTINA";
-
- Map<Float, Object[]> resultSet =
- Maps.newHashMap();
- for (Object [] t : result) {
- resultSet.put((Float) t[0], t);
+ try {
+ Object [][] result = new Object[3][3];
+
+ int tupleId = 0;
+ int colId = 0;
+ result[tupleId][colId++] = 4032.68f;
+ result[tupleId][colId++] = "Supplier#000000002";
+ result[tupleId++][colId] = "ETHIOPIA";
+
+ colId = 0;
+ result[tupleId][colId++] = 4641.08f;
+ result[tupleId][colId++] = "Supplier#000000004";
+ result[tupleId++][colId] = "MOROCCO";
+
+ colId = 0;
+ result[tupleId][colId++] = 4192.4f;
+ result[tupleId][colId++] = "Supplier#000000003";
+ result[tupleId][colId] = "ARGENTINA";
+
+ Map<Float, Object[]> resultSet =
+ Maps.newHashMap();
+ for (Object [] t : result) {
+ resultSet.put((Float) t[0], t);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ res.next();
+ Object [] resultTuple = resultSet.get(res.getFloat("s_acctbal"));
+ assertEquals(resultTuple[0], res.getFloat("s_acctbal"));
+ assertEquals(resultTuple[1], res.getString("s_name"));
+ assertEquals(resultTuple[2], res.getString("n_name"));
+ }
+
+ assertFalse(res.next());
+ } finally {
+ res.close();
}
-
- for (int i = 0; i < 3; i++) {
- res.next();
- Object [] resultTuple = resultSet.get(res.getFloat("s_acctbal"));
- assertEquals(resultTuple[0], res.getFloat("s_acctbal"));
- assertEquals(resultTuple[1], res.getString("s_name"));
- assertEquals(resultTuple[2], res.getString("n_name"));
- }
-
- assertFalse(res.next());
}
@Test
public void testJoinRefEval() throws Exception {
- ResultSet res = tpch.execute("select r_regionkey, n_regionkey, (r_regionkey + n_regionkey) as plus from region join nation on r_regionkey = n_regionkey");
- int r, n;
- while(res.next()) {
- r = res.getInt(1);
- n = res.getInt(2);
- assertEquals(r + n, res.getInt(3));
+ ResultSet res = tpch.execute("select r_regionkey, n_regionkey, (r_regionkey + n_regionkey) as plus from region, nation where r_regionkey = n_regionkey");
+ try {
+ int r, n;
+ while(res.next()) {
+ r = res.getInt(1);
+ n = res.getInt(2);
+ assertEquals(r + n, res.getInt(3));
+ }
+ } finally {
+ res.close();
}
}
@@ -137,16 +153,18 @@ public class TestJoinQuery {
"else 'zero' " +
"end as cond from region, nation where r_regionkey = n_regionkey");
-
-
- Map<Integer, String> result = Maps.newHashMap();
- result.put(0, "zero");
- result.put(1, "one");
- result.put(2, "two");
- result.put(3, "three");
- result.put(4, "four");
- while(res.next()) {
- assertEquals(result.get(res.getInt(1)), res.getString(3));
+ try {
+ Map<Integer, String> result = Maps.newHashMap();
+ result.put(0, "zero");
+ result.put(1, "one");
+ result.put(2, "two");
+ result.put(3, "three");
+ result.put(4, "four");
+ while(res.next()) {
+ assertEquals(result.get(res.getInt(1)), res.getString(3));
+ }
+ } finally {
+ res.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
index 74ec2b5..2d478dd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -55,9 +55,13 @@ public class TestNullValues {
ResultSet res = TajoTestingCluster
.run(table, schemas, opts, new String[][]{data},
"select * from nulltable1 where col2 is null");
- assertTrue(res.next());
- assertEquals(2, res.getInt(1));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals(2, res.getInt(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -77,11 +81,15 @@ public class TestNullValues {
ResultSet res = TajoTestingCluster
.run(table, schemas, opts, new String[][]{data},
"select * from nulltable2 where col2 is not null");
- assertTrue(res.next());
- assertEquals(1, res.getInt(1));
- assertTrue(res.next());
- assertEquals(3, res.getInt(1));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals(1, res.getInt(1));
+ assertTrue(res.next());
+ assertEquals(3, res.getInt(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -108,8 +116,12 @@ public class TestNullValues {
ResultSet res = TajoTestingCluster
.run(table, schemas, opts, new String[][]{data},
"select * from nulltable3 where col1 is null and col2 is null and col3 is null and col4 = 43578");
- assertTrue(res.next());
- assertEquals(43578, res.getLong(4));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals(43578, res.getLong(4));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index 7a04a40..a16d0f3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -100,7 +100,7 @@ public class TestResultSetImpl {
@Test
public void test() throws IOException, SQLException {
- ResultSetImpl rs = new ResultSetImpl(conf, sm.getTablePath("score"));
+ ResultSetImpl rs = new ResultSetImpl(null, null, conf, sm.getTablePath("score"));
ResultSetMetaData meta = rs.getMetaData();
assertNotNull(meta);
Schema schema = scoreMeta.getSchema();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index a47f5bd..2a189f8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -48,90 +48,106 @@ public class TestSelectQuery {
@Test
public final void testSelect() throws Exception {
ResultSet res = tpch.execute("select l_orderkey, l_partkey from lineitem");
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
-
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
- assertEquals(1, res.getInt(2));
-
- res.next();
- assertEquals(2, res.getInt(1));
- assertEquals(2, res.getInt(2));
+ try {
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+ assertEquals(1, res.getInt(2));
+
+ res.next();
+ assertEquals(2, res.getInt(1));
+ assertEquals(2, res.getInt(2));
+ } finally {
+ res.close();
+ }
}
@Test
public final void testSelect2() throws Exception {
ResultSet res = tpch.execute("select l_orderkey, l_partkey, l_orderkey + l_partkey as plus from lineitem");
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
- assertEquals(2, res.getInt(3));
-
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
- assertEquals(2, res.getInt(3));
-
- res.next();
- assertEquals(2, res.getInt(1));
- assertEquals(2, res.getInt(2));
- assertEquals(4, res.getInt(3));
+ try {
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+ assertEquals(2, res.getInt(3));
+
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+ assertEquals(2, res.getInt(3));
+
+ res.next();
+ assertEquals(2, res.getInt(1));
+ assertEquals(2, res.getInt(2));
+ assertEquals(4, res.getInt(3));
+ } finally {
+ res.close();
+ }
}
@Test
public final void testSelect3() throws Exception {
ResultSet res = tpch.execute("select l_orderkey + l_partkey as plus from lineitem");
- res.next();
- assertEquals(2, res.getInt(1));
+ try {
+ res.next();
+ assertEquals(2, res.getInt(1));
- res.next();
- assertEquals(2, res.getInt(1));
+ res.next();
+ assertEquals(2, res.getInt(1));
- res.next();
- assertEquals(4, res.getInt(1));
+ res.next();
+ assertEquals(4, res.getInt(1));
+ } finally {
+ res.close();
+ }
}
@Test
public final void testSelectAsterik() throws Exception {
ResultSet res = tpch.execute("select * from lineitem");
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
- assertEquals(7706, res.getInt(3));
- assertEquals(1, res.getInt(4));
- assertTrue(17 == res.getFloat(5));
- assertTrue(21168.23f == res.getFloat(6));
- assertTrue(0.04f == res.getFloat(7));
- assertTrue(0.02f == res.getFloat(8));
- assertEquals("N",res.getString(9));
- assertEquals("O",res.getString(10));
- assertEquals("1996-03-13",res.getString(11));
- assertEquals("1996-02-12",res.getString(12));
- assertEquals("1996-03-22",res.getString(13));
- assertEquals("DELIVER IN PERSON",res.getString(14));
- assertEquals("TRUCK",res.getString(15));
- assertEquals("egular courts above the",res.getString(16));
-
- res.next();
- assertEquals(1, res.getInt(1));
- assertEquals(1, res.getInt(2));
- assertEquals(7311, res.getInt(3));
- assertEquals(2, res.getInt(4));
- assertTrue(36 == res.getFloat(5));
- assertTrue(45983.16f == res.getFloat(6));
- assertTrue(0.09f == res.getFloat(7));
- assertTrue(0.06f == res.getFloat(8));
- assertEquals("N",res.getString(9));
- assertEquals("O",res.getString(10));
- assertEquals("1996-04-12",res.getString(11));
- assertEquals("1996-02-28",res.getString(12));
- assertEquals("1996-04-20",res.getString(13));
- assertEquals("TAKE BACK RETURN",res.getString(14));
- assertEquals("MAIL",res.getString(15));
- assertEquals("ly final dependencies: slyly bold",res.getString(16));
+ try {
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+ assertEquals(7706, res.getInt(3));
+ assertEquals(1, res.getInt(4));
+ assertTrue(17 == res.getFloat(5));
+ assertTrue(21168.23f == res.getFloat(6));
+ assertTrue(0.04f == res.getFloat(7));
+ assertTrue(0.02f == res.getFloat(8));
+ assertEquals("N",res.getString(9));
+ assertEquals("O",res.getString(10));
+ assertEquals("1996-03-13",res.getString(11));
+ assertEquals("1996-02-12",res.getString(12));
+ assertEquals("1996-03-22",res.getString(13));
+ assertEquals("DELIVER IN PERSON",res.getString(14));
+ assertEquals("TRUCK",res.getString(15));
+ assertEquals("egular courts above the",res.getString(16));
+
+ res.next();
+ assertEquals(1, res.getInt(1));
+ assertEquals(1, res.getInt(2));
+ assertEquals(7311, res.getInt(3));
+ assertEquals(2, res.getInt(4));
+ assertTrue(36 == res.getFloat(5));
+ assertTrue(45983.16f == res.getFloat(6));
+ assertTrue(0.09f == res.getFloat(7));
+ assertTrue(0.06f == res.getFloat(8));
+ assertEquals("N",res.getString(9));
+ assertEquals("O",res.getString(10));
+ assertEquals("1996-04-12",res.getString(11));
+ assertEquals("1996-02-28",res.getString(12));
+ assertEquals("1996-04-20",res.getString(13));
+ assertEquals("TAKE BACK RETURN",res.getString(14));
+ assertEquals("MAIL",res.getString(15));
+ assertEquals("ly final dependencies: slyly bold",res.getString(16));
+ } finally {
+ res.close();
+ }
}
@Test
@@ -145,21 +161,29 @@ public class TestSelectQuery {
ResultSet res = tpch.execute(
"select distinct l_orderkey, l_linenumber from lineitem");
- int cnt = 0;
- while(res.next()) {
- assertTrue(result1.contains(res.getInt(1) + "," + res.getInt(2)));
- cnt++;
+ try {
+ int cnt = 0;
+ while(res.next()) {
+ assertTrue(result1.contains(res.getInt(1) + "," + res.getInt(2)));
+ cnt++;
+ }
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
}
- assertEquals(5, cnt);
res = tpch.execute("select distinct l_orderkey from lineitem");
- Set<Integer> result2 = Sets.newHashSet(1,2,3);
- cnt = 0;
- while (res.next()) {
- assertTrue(result2.contains(res.getInt(1)));
- cnt++;
+ try {
+ Set<Integer> result2 = Sets.newHashSet(1,2,3);
+ int cnt = 0;
+ while (res.next()) {
+ assertTrue(result2.contains(res.getInt(1)));
+ cnt++;
+ }
+ assertEquals(3,cnt);
+ } finally {
+ res.close();
}
- assertEquals(3,cnt);
}
@Test
@@ -169,12 +193,16 @@ public class TestSelectQuery {
ResultSet res = tpch.execute(
"SELECT n_name FROM nation WHERE n_name LIKE '%IA'");
- int cnt = 0;
- while(res.next()) {
- assertTrue(result.contains(res.getString(1)));
- cnt++;
+ try {
+ int cnt = 0;
+ while(res.next()) {
+ assertTrue(result.contains(res.getString(1)));
+ cnt++;
+ }
+ assertEquals(result.size(), cnt);
+ } finally {
+ res.close();
}
- assertEquals(result.size(), cnt);
}
@Test
@@ -183,21 +211,29 @@ public class TestSelectQuery {
ResultSet res = tpch.execute(
"select l_orderkey from lineitem where l_shipdate <= '1996-03-22'");
- int cnt = 0;
- while(res.next()) {
- assertTrue(result.contains(res.getInt(1)));
- cnt++;
+ try {
+ int cnt = 0;
+ while(res.next()) {
+ assertTrue(result.contains(res.getInt(1)));
+ cnt++;
+ }
+ assertEquals(3, cnt);
+ } finally {
+ res.close();
}
- assertEquals(3, cnt);
}
@Test
public final void testRealValueCompare() throws Exception {
ResultSet res = tpch.execute("select ps_supplycost from partsupp where ps_supplycost = 771.64");
- res.next();
- assertTrue(771.64f == res.getFloat(1));
- assertFalse(res.next());
+ try {
+ res.next();
+ assertTrue(771.64f == res.getFloat(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -211,19 +247,23 @@ public class TestSelectQuery {
"else 'zero' " +
"end as cond from region");
- Map<Integer, String> result = Maps.newHashMap();
- result.put(0, "zero");
- result.put(1, "one");
- result.put(2, "two");
- result.put(3, "three");
- result.put(4, "four");
- int cnt = 0;
- while(res.next()) {
- assertEquals(result.get(res.getInt(1)), res.getString(2));
- cnt++;
+ try {
+ Map<Integer, String> result = Maps.newHashMap();
+ result.put(0, "zero");
+ result.put(1, "one");
+ result.put(2, "two");
+ result.put(3, "three");
+ result.put(4, "four");
+ int cnt = 0;
+ while(res.next()) {
+ assertEquals(result.get(res.getInt(1)), res.getString(2));
+ cnt++;
+ }
+
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
}
-
- assertEquals(5, cnt);
}
@Test
@@ -235,75 +275,98 @@ public class TestSelectQuery {
"when r_regionkey = 4 then 'four' " +
"end as cond from region");
- Map<Integer, String> result = Maps.newHashMap();
- result.put(0, "NULL");
- result.put(1, "one");
- result.put(2, "two");
- result.put(3, "three");
- result.put(4, "four");
- int cnt = 0;
- while(res.next()) {
- assertEquals(result.get(res.getInt(1)), res.getString(2));
- cnt++;
+ try {
+ Map<Integer, String> result = Maps.newHashMap();
+ result.put(0, "NULL");
+ result.put(1, "one");
+ result.put(2, "two");
+ result.put(3, "three");
+ result.put(4, "four");
+ int cnt = 0;
+ while(res.next()) {
+ assertEquals(result.get(res.getInt(1)), res.getString(2));
+ cnt++;
+ }
+
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
}
-
- assertEquals(5, cnt);
}
@Test
public final void testNotEqual() throws Exception {
ResultSet res = tpch.execute(
"select l_orderkey from lineitem where l_orderkey != 1");
- assertTrue(res.next());
- assertEquals(2, res.getInt(1));
- assertTrue(res.next());
- assertEquals(3, res.getInt(1));
- assertTrue(res.next());
- assertEquals(3, res.getInt(1));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals(2, res.getInt(1));
+ assertTrue(res.next());
+ assertEquals(3, res.getInt(1));
+ assertTrue(res.next());
+ assertEquals(3, res.getInt(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
public final void testUnion1() throws Exception {
ResultSet res = tpch.execute(
"select o_custkey as num from orders union select c_custkey as num from customer");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ assertEquals(6, count);
+ } finally {
+ res.close();
}
- assertEquals(6, count);
}
@Test
public final void testUnion2() throws Exception {
ResultSet res = tpch.execute(
"select l_orderkey from lineitem l1 union select l_orderkey from lineitem l2");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ } finally {
+ res.close();
}
- assertEquals(10, count);
}
@Test
public final void testCreateAfterSelect() throws Exception {
ResultSet res = tpch.execute(
"create table orderkeys as select l_orderkey from lineitem");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ assertEquals(count, 5);
+ } finally {
+ res.close();
}
- assertEquals(count, 5);
}
//@Test
// TODO - fix and enable this unit test
public final void testLimit() throws Exception {
ResultSet res = tpch.execute("select l_orderkey from lineitem limit 3");
- int count = 0;
- for (;res.next();) {
- count++;
+ try {
+ int count = 0;
+ for (;res.next();) {
+ count++;
+ }
+ assertEquals(3, count);
+ } finally {
+ res.close();
}
- assertEquals(3, count);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 14344fe..b211bc8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -50,70 +50,86 @@ public class TestSortQuery {
public final void testSort() throws Exception {
ResultSet res = tpch.execute(
"select l_linenumber, l_orderkey from lineitem order by l_orderkey");
- int cnt = 0;
- Long prev = null;
- while(res.next()) {
- if (prev == null) {
- prev = res.getLong(2);
- } else {
- assertTrue(prev <= res.getLong(2));
- prev = res.getLong(2);
+ try {
+ int cnt = 0;
+ Long prev = null;
+ while(res.next()) {
+ if (prev == null) {
+ prev = res.getLong(2);
+ } else {
+ assertTrue(prev <= res.getLong(2));
+ prev = res.getLong(2);
+ }
+ cnt++;
}
- cnt++;
- }
- assertEquals(5, cnt);
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
+ }
}
@Test
public final void testSortWithAliasKey() throws Exception {
ResultSet res = tpch.execute(
"select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey");
- int cnt = 0;
- Long prev = null;
- while(res.next()) {
- if (prev == null) {
- prev = res.getLong(2);
- } else {
- assertTrue(prev <= res.getLong(2));
- prev = res.getLong(2);
+ try {
+ int cnt = 0;
+ Long prev = null;
+ while(res.next()) {
+ if (prev == null) {
+ prev = res.getLong(2);
+ } else {
+ assertTrue(prev <= res.getLong(2));
+ prev = res.getLong(2);
+ }
+ cnt++;
}
- cnt++;
- }
- assertEquals(5, cnt);
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
+ }
}
@Test
public final void testSortDesc() throws Exception {
ResultSet res = tpch.execute(
"select l_linenumber, l_orderkey from lineitem order by l_orderkey desc");
- int cnt = 0;
- Long prev = null;
- while(res.next()) {
- if (prev == null) {
- prev = res.getLong(2);
- } else {
- assertTrue(prev >= res.getLong(2));
- prev = res.getLong(2);
+ try {
+ int cnt = 0;
+ Long prev = null;
+ while(res.next()) {
+ if (prev == null) {
+ prev = res.getLong(2);
+ } else {
+ assertTrue(prev >= res.getLong(2));
+ prev = res.getLong(2);
+ }
+ cnt++;
}
- cnt++;
- }
- assertEquals(5, cnt);
+ assertEquals(5, cnt);
+ } finally {
+ res.close();
+ }
}
@Test
public final void testTopK() throws Exception {
ResultSet res = tpch.execute(
"select l_orderkey, l_linenumber from lineitem order by l_orderkey desc limit 3");
- assertTrue(res.next());
- assertEquals(3, res.getLong(1));
- assertTrue(res.next());
- assertEquals(3, res.getLong(1));
- assertTrue(res.next());
- assertEquals(2, res.getLong(1));
- assertFalse(res.next());
+ try {
+ assertTrue(res.next());
+ assertEquals(3, res.getLong(1));
+ assertTrue(res.next());
+ assertEquals(3, res.getLong(1));
+ assertTrue(res.next());
+ assertEquals(2, res.getLong(1));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
@Test
@@ -121,19 +137,23 @@ public class TestSortQuery {
ResultSet res = tpch.execute("select max(l_quantity), l_orderkey "
+ "from lineitem group by l_orderkey order by l_orderkey");
- int cnt = 0;
- Long prev = null;
- while(res.next()) {
- if (prev == null) {
- prev = res.getLong(1);
- } else {
- assertTrue(prev <= res.getLong(1));
- prev = res.getLong(1);
+ try {
+ int cnt = 0;
+ Long prev = null;
+ while(res.next()) {
+ if (prev == null) {
+ prev = res.getLong(1);
+ } else {
+ assertTrue(prev <= res.getLong(1));
+ prev = res.getLong(1);
+ }
+ cnt++;
}
- cnt++;
- }
- assertEquals(3, cnt);
+ assertEquals(3, cnt);
+ } finally {
+ res.close();
+ }
}
@Test
@@ -141,18 +161,22 @@ public class TestSortQuery {
ResultSet res = tpch.execute("select max(l_quantity) as max_quantity, l_orderkey "
+ "from lineitem group by l_orderkey order by max_quantity");
- int cnt = 0;
- Long prev = null;
- while(res.next()) {
- if (prev == null) {
- prev = res.getLong(1);
- } else {
- assertTrue(prev <= res.getLong(1));
- prev = res.getLong(1);
+ try {
+ int cnt = 0;
+ Long prev = null;
+ while(res.next()) {
+ if (prev == null) {
+ prev = res.getLong(1);
+ } else {
+ assertTrue(prev <= res.getLong(1));
+ prev = res.getLong(1);
+ }
+ cnt++;
}
- cnt++;
- }
- assertEquals(3, cnt);
+ assertEquals(3, cnt);
+ } finally {
+ res.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 69b8bac..a67dd26 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -72,7 +72,7 @@ public class TestExecutionBlockCursor {
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
- planner = new GlobalPlanner(conf, catalog, sm, dispatcher.getEventHandler());
+ planner = new GlobalPlanner(conf, sm, dispatcher.getEventHandler());
}
public static void tearDown() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 55b9a91..a8924dd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -18,14 +18,16 @@
package org.apache.tajo.master;
-import org.apache.tajo.TestTajoIds;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.junit.Test;
import org.apache.tajo.QueryId;
import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TestTajoIds;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Repartitioner;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.junit.Test;
import java.net.URI;
import java.util.*;
@@ -48,7 +50,7 @@ public class TestRepartitioner {
Collection<URI> uris = Repartitioner.
createHashFetchURL(hostName + ":" + port, sid, partitionId,
- PartitionType.HASH, intermediateEntries);
+ PartitionType.HASH, intermediateEntries);
List<String> taList = TUtil.newList();
for (URI uri : uris) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
index e61f721..05a269e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
@@ -29,7 +29,7 @@ import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
import org.apache.tajo.SubQueryId;
import org.apache.tajo.TestTajoIds;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import org.apache.tajo.util.TajoIdUtils;
@@ -46,8 +46,8 @@ public class TaskRunnerTest {
ProtoAsyncRpcClient mockClient = mock(ProtoAsyncRpcClient.class);
mockClient.close();
- MasterWorkerProtocolService.Interface mockMaster =
- mock(MasterWorkerProtocolService.Interface.class);
+ QueryMasterProtocolService.Interface mockMaster =
+ mock(QueryMasterProtocolService.Interface.class);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
q1.getApplicationId(), q1.getAttemptId());
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/log4j.properties b/tajo-core/tajo-core-backend/src/test/resources/log4j.properties
index ad07100..145703c 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/test/resources/log4j.properties
@@ -19,7 +19,10 @@
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
-log4j.threshhold=DEBUG
+log4j.threshhold=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+log4j.logger.org.apache.hadoop=WARN
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/pom.xml b/tajo-core/tajo-core-pullserver/pom.xml
index 2035a4e..1ce6aba 100644
--- a/tajo-core/tajo-core-pullserver/pom.xml
+++ b/tajo-core/tajo-core-pullserver/pom.xml
@@ -59,11 +59,10 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
+ <groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<scope>compile</scope>
</dependency>
-
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-storage/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-storage/src/main/proto/CatalogProtos.proto
index 96b35b3..6164553 100644
--- a/tajo-core/tajo-core-storage/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-storage/src/main/proto/CatalogProtos.proto
@@ -25,9 +25,9 @@ option java_generate_equals_and_hash = true;
import "DataTypes.proto";
enum StoreType {
- MEM = 0;
- CSV = 1;
- RAW = 2;
+ MEM = 0;
+ CSV = 1;
+ RAW = 2;
RCFILE = 3;
ROWFILE = 4;
HCFILE = 5;
@@ -35,147 +35,147 @@ enum StoreType {
}
enum OrderType {
- ORDER_NONE = 0;
- ASC = 1;
- DSC = 2;
+ ORDER_NONE = 0;
+ ASC = 1;
+ DSC = 2;
}
enum CompressType {
- COMP_NONE = 0;
- NULL_SUPPRESS = 1;
- RUN_LENGTH = 2;
- BIT_VECTOR = 3;
- DICTIONARY = 4;
- SNAPPY = 5;
- LZ = 6;
+ COMP_NONE = 0;
+ NULL_SUPPRESS = 1;
+ RUN_LENGTH = 2;
+ BIT_VECTOR = 3;
+ DICTIONARY = 4;
+ SNAPPY = 5;
+ LZ = 6;
}
message ColumnMetaProto {
- required DataType dataType = 1;
- required bool compressed = 2;
- required bool sorted = 3;
- required bool contiguous = 4;
- required StoreType storeType = 5;
- required CompressType compType = 6;
- required int64 startRid = 7;
- required int32 recordNum = 8;
- required int32 offsetToIndex = 9;
+ required DataType dataType = 1;
+ required bool compressed = 2;
+ required bool sorted = 3;
+ required bool contiguous = 4;
+ required StoreType storeType = 5;
+ required CompressType compType = 6;
+ required int64 startRid = 7;
+ required int32 recordNum = 8;
+ required int32 offsetToIndex = 9;
}
message ColumnProto {
- required string columnName = 1;
- required DataType dataType = 2;
+ required string columnName = 1;
+ required DataType dataType = 2;
}
message SchemaProto {
- repeated ColumnProto fields = 1;
+ repeated ColumnProto fields = 1;
}
message KeyValueProto {
- required string key = 1;
- required string value = 2;
+ required string key = 1;
+ required string value = 2;
}
message KeyValueSetProto {
- repeated KeyValueProto keyval = 1;
+ repeated KeyValueProto keyval = 1;
}
message FragmentProto {
- required string id = 1;
- required string path = 2;
- required int64 startOffset = 3;
- required int64 length = 4;
- required TableProto meta = 5;
- optional TableStatProto stat = 6;
+ required string id = 1;
+ required string path = 2;
+ required int64 startOffset = 3;
+ required int64 length = 4;
+ required TableProto meta = 5;
+ optional TableStatProto stat = 6;
optional bool distCached = 7 [default = false];
}
message TableProto {
- required SchemaProto schema = 1;
- required StoreType storeType = 2;
- required KeyValueSetProto params = 3;
- optional TableStatProto stat = 4;
+ required SchemaProto schema = 1;
+ required StoreType storeType = 2;
+ required KeyValueSetProto params = 3;
+ optional TableStatProto stat = 4;
}
message TableDescProto {
- required string id = 1;
- required string path = 2;
- required TableProto meta = 3;
+ required string id = 1;
+ required string path = 2;
+ required TableProto meta = 3;
}
enum FunctionType {
- GENERAL = 0;
- AGGREGATION = 1;
+ GENERAL = 0;
+ AGGREGATION = 1;
}
message FunctionDescProto {
- required string signature = 1;
- required string className = 2;
- required FunctionType type = 3;
- repeated Type parameterTypes = 4;
- required Type returnType = 5;
+ required string signature = 1;
+ required string className = 2;
+ required FunctionType type = 3;
+ repeated DataType parameterTypes = 4;
+ required DataType returnType = 5;
}
message IndexDescProto {
- required string name = 1;
- required string tableId = 2;
- required ColumnProto column = 3;
- required IndexMethod indexMethod = 4;
- optional bool isUnique = 5 [default = false];
- optional bool isClustered = 6 [default = false];
- optional bool isAscending = 7 [default = false];
+ required string name = 1;
+ required string tableId = 2;
+ required ColumnProto column = 3;
+ required IndexMethod indexMethod = 4;
+ optional bool isUnique = 5 [default = false];
+ optional bool isClustered = 6 [default = false];
+ optional bool isAscending = 7 [default = false];
}
enum IndexMethod {
- TWO_LEVEL_BIN_TREE = 0;
- BTREE = 1;
- HASH = 2;
- BITMAP = 3;
+ TWO_LEVEL_BIN_TREE = 0;
+ BTREE = 1;
+ HASH = 2;
+ BITMAP = 3;
}
message GetAllTableNamesResponse {
- repeated string tableName = 1;
+ repeated string tableName = 1;
}
message GetIndexRequest {
- required string tableName = 1;
- required string columnName = 2;
+ required string tableName = 1;
+ required string columnName = 2;
}
message GetFunctionsResponse {
- repeated FunctionDescProto functionDesc = 1;
+ repeated FunctionDescProto functionDesc = 1;
}
message UnregisterFunctionRequest {
- required string signature = 1;
- repeated Type parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message GetFunctionMetaRequest {
- required string signature = 1;
- repeated Type parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message ContainFunctionRequest {
- required string signature = 1;
- repeated Type parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message TableStatProto {
- required int64 numRows = 1;
- required int64 numBytes = 2;
- optional int32 numBlocks = 3;
- optional int32 numPartitions = 4;
- optional int64 avgRows = 5;
- repeated ColumnStatProto colStat = 6;
+ required int64 numRows = 1;
+ required int64 numBytes = 2;
+ optional int32 numBlocks = 3;
+ optional int32 numPartitions = 4;
+ optional int64 avgRows = 5;
+ repeated ColumnStatProto colStat = 6;
}
message ColumnStatProto {
- required ColumnProto column = 1;
- optional int64 numDistVal = 2;
- optional int64 numNulls = 3;
- optional bytes minValue = 4;
- optional bytes maxValue = 5;
+ required ColumnProto column = 1;
+ optional int64 numDistVal = 2;
+ optional int64 numNulls = 3;
+ optional bytes minValue = 4;
+ optional bytes maxValue = 5;
}
enum StatType {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index b03880e..068623a 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -36,7 +36,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<tajo.version>0.2.0-SNAPSHOT</tajo.version>
- <hadoop.version>2.0.3-alpha</hadoop.version>
+ <hadoop.version>2.0.5-alpha</hadoop.version>
</properties>
<licenses>
@@ -749,11 +749,18 @@
<version>1.9.5-rc1</version>
<scope>test</scope>
</dependency>
+ <!--
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
</dependency>
+ -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.6.6.Final</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 2c28282..6875291 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -135,8 +135,9 @@
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
+ <groupId>io.netty</groupId>
<artifactId>netty</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>