You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:06 UTC
[8/8] git commit: TAJO-91: Launch QueryMaster on NodeManager per
query. (hyoungjunkim via hyunsik)
TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9d020883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9d020883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9d020883
Branch: refs/heads/master
Commit: 9d02088397aca145aa77799e24469a983cc60974
Parents: a269372
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Aug 14 13:37:18 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Aug 14 13:38:51 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tajo/catalog/CatalogService.java | 4 +-
.../org/apache/tajo/catalog/FunctionDesc.java | 2 +-
.../src/main/proto/CatalogProtos.proto | 160 ++--
.../apache/tajo/catalog/TestFunctionDesc.java | 5 +-
.../org/apache/tajo/catalog/CatalogServer.java | 6 +-
.../src/main/java/org/apache/tajo/QueryId.java | 4 +-
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../main/java/org/apache/tajo/util/Bytes.java | 14 +-
.../org/apache/tajo/util/CommonTestingUtil.java | 2 +-
.../java/org/apache/tajo/util/TajoIdUtils.java | 40 +-
tajo-core/tajo-core-backend/pom.xml | 23 +-
.../org/apache/tajo/benchmark/BenchmarkSet.java | 9 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 33 +-
.../org/apache/tajo/client/QueryStatus.java | 15 +-
.../java/org/apache/tajo/client/TajoClient.java | 162 +++-
.../apache/tajo/engine/eval/EvalTreeUtil.java | 6 +-
.../apache/tajo/engine/planner/FromTable.java | 4 +
.../tajo/engine/query/QueryUnitRequestImpl.java | 4 +-
.../apache/tajo/engine/query/ResultSetImpl.java | 66 +-
.../ipc/protocolrecords/QueryUnitRequest.java | 7 +-
.../org/apache/tajo/master/ClientService.java | 411 ----------
.../org/apache/tajo/master/ContainerProxy.java | 429 ++++++++++
.../org/apache/tajo/master/GlobalEngine.java | 85 +-
.../org/apache/tajo/master/GlobalPlanner.java | 2 +-
.../main/java/org/apache/tajo/master/Query.java | 409 ----------
.../org/apache/tajo/master/QueryMaster.java | 465 -----------
.../java/org/apache/tajo/master/QueryUnit.java | 499 -----------
.../apache/tajo/master/QueryUnitAttempt.java | 344 --------
.../org/apache/tajo/master/Repartitioner.java | 582 -------------
.../java/org/apache/tajo/master/SubQuery.java | 766 -----------------
.../org/apache/tajo/master/SubQueryState.java | 28 -
.../java/org/apache/tajo/master/TajoMaster.java | 64 +-
.../tajo/master/TajoMasterClientService.java | 412 ++++++++++
.../tajo/master/TaskRunnerLauncherImpl.java | 487 ++---------
.../apache/tajo/master/TaskRunnerListener.java | 172 ----
.../apache/tajo/master/TaskSchedulerImpl.java | 25 +-
.../tajo/master/cluster/WorkerListener.java | 148 ----
.../master/event/SubQueryCompletedEvent.java | 2 +-
.../tajo/master/event/SubQuerySucceeEvent.java | 2 +-
.../event/TaskAttemptStatusUpdateEvent.java | 2 +-
.../tajo/master/event/TaskCompletionEvent.java | 2 +-
.../tajo/master/event/TaskFatalErrorEvent.java | 2 +-
.../tajo/master/event/TaskRequestEvent.java | 2 +-
.../apache/tajo/master/querymaster/Query.java | 413 ++++++++++
.../tajo/master/querymaster/QueryMaster.java | 817 +++++++++++++++++++
.../querymaster/QueryMasterClientService.java | 196 +++++
.../master/querymaster/QueryMasterManager.java | 353 ++++++++
.../querymaster/QueryMasterManagerService.java | 114 +++
.../master/querymaster/QueryMasterRunner.java | 152 ++++
.../tajo/master/querymaster/QueryUnit.java | 500 ++++++++++++
.../master/querymaster/QueryUnitAttempt.java | 344 ++++++++
.../tajo/master/querymaster/Repartitioner.java | 584 +++++++++++++
.../tajo/master/querymaster/SubQuery.java | 766 +++++++++++++++++
.../tajo/master/querymaster/SubQueryState.java | 28 +
.../tajo/master/rm/RMContainerAllocator.java | 31 +-
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 61 +-
.../src/main/proto/CatalogProtos.proto | 160 ++--
.../src/main/proto/ClientProtocol.proto | 7 +-
.../src/main/proto/ClientProtos.proto | 136 +++
.../src/main/proto/MasterWorkerProtocol.proto | 36 -
.../src/main/proto/MasterWorkerProtos.proto | 118 ---
.../main/proto/QueryMasterClientProtocol.proto | 36 +
.../main/proto/QueryMasterManagerProtocol.proto | 50 ++
.../src/main/proto/QueryMasterProtocol.proto | 132 +++
.../main/proto/TajoMasterClientProtocol.proto | 64 ++
.../src/main/proto/tajo_protos.proto | 17 +-
.../src/main/resources/tajo-default.xml | 18 +
.../org/apache/tajo/BackendTestingUtil.java | 2 +-
.../apache/tajo/LocalTajoTestingUtility.java | 12 +
.../org/apache/tajo/MiniTajoYarnCluster.java | 5 +
.../org/apache/tajo/TajoTestingCluster.java | 63 +-
.../test/java/org/apache/tajo/TpchTestBase.java | 5 +-
.../org/apache/tajo/benchmark/TestTPCH.java | 45 +-
.../org/apache/tajo/client/TestTajoClient.java | 4 +-
.../engine/function/TestBuiltinFunctions.java | 58 +-
.../plan/global/TestGlobalQueryPlanner.java | 2 +-
.../global/TestGlobalQueryOptimizer.java | 10 +-
.../tajo/engine/query/TestGroupByQuery.java | 43 +-
.../apache/tajo/engine/query/TestJoinQuery.java | 134 +--
.../tajo/engine/query/TestNullValues.java | 34 +-
.../tajo/engine/query/TestResultSetImpl.java | 2 +-
.../tajo/engine/query/TestSelectQuery.java | 339 ++++----
.../apache/tajo/engine/query/TestSortQuery.java | 148 ++--
.../tajo/master/TestExecutionBlockCursor.java | 2 +-
.../apache/tajo/master/TestRepartitioner.java | 10 +-
.../org/apache/tajo/worker/TaskRunnerTest.java | 6 +-
.../src/test/resources/log4j.properties | 5 +-
tajo-core/tajo-core-pullserver/pom.xml | 3 +-
.../src/main/proto/CatalogProtos.proto | 160 ++--
tajo-project/pom.xml | 9 +-
tajo-rpc/pom.xml | 3 +-
.../org/apache/tajo/rpc/NettyServerBase.java | 52 +-
.../apache/tajo/rpc/ProtoBlockingRpcServer.java | 4 -
.../org/apache/tajo/rpc/TestNettyAsyncRpc.java | 115 ---
.../apache/tajo/rpc/TestNettyBlockingRpc.java | 110 ---
.../apache/tajo/rpc/TestProtoBlockingRpc.java | 65 +-
.../test/impl/DummyProtocolBlockingImpl.java | 2 +-
99 files changed, 6969 insertions(+), 5530 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 808d9cb..0e84b29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-91: Launch QueryMaster on NodeManager per query.
+ (hyoungjunkim via hyunsik)
+
TAJO-100: Port the parse error handling to the new parser. (jinho)
TAJO-121: Add LogicalPlanVisitor and Refactor LogicalOptimizer to use the
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
index c4eb535..d0b5f50 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -28,7 +28,7 @@ public interface CatalogService {
* Get a table description by name
* @param name table name
* @return a table description
- * @see TableDescImpl
+ * @see org.apache.tajo.catalog.TableDescImpl
* @throws Throwable
*/
TableDesc getTableDesc(String name);
@@ -49,7 +49,7 @@ public interface CatalogService {
/**
* Add a table via table description
- * @see TableDescImpl
+ * @see org.apache.tajo.catalog.TableDescImpl
* @throws Throwable
*/
boolean addTable(TableDesc desc);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 8f678b9..73f861a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -67,7 +67,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
/**
*
* @return 함수 인스턴스
- * @throws InternalException
+ * @throws org.apache.tajo.exception.InternalException
*/
public Function newInstance() throws InternalException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index c171c2b..6164553 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -25,9 +25,9 @@ option java_generate_equals_and_hash = true;
import "DataTypes.proto";
enum StoreType {
- MEM = 0;
- CSV = 1;
- RAW = 2;
+ MEM = 0;
+ CSV = 1;
+ RAW = 2;
RCFILE = 3;
ROWFILE = 4;
HCFILE = 5;
@@ -35,147 +35,147 @@ enum StoreType {
}
enum OrderType {
- ORDER_NONE = 0;
- ASC = 1;
- DSC = 2;
+ ORDER_NONE = 0;
+ ASC = 1;
+ DSC = 2;
}
enum CompressType {
- COMP_NONE = 0;
- NULL_SUPPRESS = 1;
- RUN_LENGTH = 2;
- BIT_VECTOR = 3;
- DICTIONARY = 4;
- SNAPPY = 5;
- LZ = 6;
+ COMP_NONE = 0;
+ NULL_SUPPRESS = 1;
+ RUN_LENGTH = 2;
+ BIT_VECTOR = 3;
+ DICTIONARY = 4;
+ SNAPPY = 5;
+ LZ = 6;
}
message ColumnMetaProto {
- required DataType dataType = 1;
- required bool compressed = 2;
- required bool sorted = 3;
- required bool contiguous = 4;
- required StoreType storeType = 5;
- required CompressType compType = 6;
- required int64 startRid = 7;
- required int32 recordNum = 8;
- required int32 offsetToIndex = 9;
+ required DataType dataType = 1;
+ required bool compressed = 2;
+ required bool sorted = 3;
+ required bool contiguous = 4;
+ required StoreType storeType = 5;
+ required CompressType compType = 6;
+ required int64 startRid = 7;
+ required int32 recordNum = 8;
+ required int32 offsetToIndex = 9;
}
message ColumnProto {
- required string columnName = 1;
- required DataType dataType = 2;
+ required string columnName = 1;
+ required DataType dataType = 2;
}
message SchemaProto {
- repeated ColumnProto fields = 1;
+ repeated ColumnProto fields = 1;
}
message KeyValueProto {
- required string key = 1;
- required string value = 2;
+ required string key = 1;
+ required string value = 2;
}
message KeyValueSetProto {
- repeated KeyValueProto keyval = 1;
+ repeated KeyValueProto keyval = 1;
}
message FragmentProto {
- required string id = 1;
- required string path = 2;
- required int64 startOffset = 3;
- required int64 length = 4;
- required TableProto meta = 5;
- optional TableStatProto stat = 6;
+ required string id = 1;
+ required string path = 2;
+ required int64 startOffset = 3;
+ required int64 length = 4;
+ required TableProto meta = 5;
+ optional TableStatProto stat = 6;
optional bool distCached = 7 [default = false];
}
message TableProto {
- required SchemaProto schema = 1;
- required StoreType storeType = 2;
- required KeyValueSetProto params = 3;
- optional TableStatProto stat = 4;
+ required SchemaProto schema = 1;
+ required StoreType storeType = 2;
+ required KeyValueSetProto params = 3;
+ optional TableStatProto stat = 4;
}
message TableDescProto {
- required string id = 1;
- required string path = 2;
- required TableProto meta = 3;
+ required string id = 1;
+ required string path = 2;
+ required TableProto meta = 3;
}
enum FunctionType {
- GENERAL = 0;
- AGGREGATION = 1;
+ GENERAL = 0;
+ AGGREGATION = 1;
}
message FunctionDescProto {
- required string signature = 1;
- required string className = 2;
- required FunctionType type = 3;
- repeated DataType parameterTypes = 4;
- required DataType returnType = 5;
+ required string signature = 1;
+ required string className = 2;
+ required FunctionType type = 3;
+ repeated DataType parameterTypes = 4;
+ required DataType returnType = 5;
}
message IndexDescProto {
- required string name = 1;
- required string tableId = 2;
- required ColumnProto column = 3;
- required IndexMethod indexMethod = 4;
- optional bool isUnique = 5 [default = false];
- optional bool isClustered = 6 [default = false];
- optional bool isAscending = 7 [default = false];
+ required string name = 1;
+ required string tableId = 2;
+ required ColumnProto column = 3;
+ required IndexMethod indexMethod = 4;
+ optional bool isUnique = 5 [default = false];
+ optional bool isClustered = 6 [default = false];
+ optional bool isAscending = 7 [default = false];
}
enum IndexMethod {
- TWO_LEVEL_BIN_TREE = 0;
- BTREE = 1;
- HASH = 2;
- BITMAP = 3;
+ TWO_LEVEL_BIN_TREE = 0;
+ BTREE = 1;
+ HASH = 2;
+ BITMAP = 3;
}
message GetAllTableNamesResponse {
- repeated string tableName = 1;
+ repeated string tableName = 1;
}
message GetIndexRequest {
- required string tableName = 1;
- required string columnName = 2;
+ required string tableName = 1;
+ required string columnName = 2;
}
message GetFunctionsResponse {
- repeated FunctionDescProto functionDesc = 1;
+ repeated FunctionDescProto functionDesc = 1;
}
message UnregisterFunctionRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message GetFunctionMetaRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message ContainFunctionRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message TableStatProto {
- required int64 numRows = 1;
- required int64 numBytes = 2;
- optional int32 numBlocks = 3;
- optional int32 numPartitions = 4;
- optional int64 avgRows = 5;
- repeated ColumnStatProto colStat = 6;
+ required int64 numRows = 1;
+ required int64 numBytes = 2;
+ optional int32 numBlocks = 3;
+ optional int32 numPartitions = 4;
+ optional int64 avgRows = 5;
+ repeated ColumnStatProto colStat = 6;
}
message ColumnStatProto {
- required ColumnProto column = 1;
- optional int64 numDistVal = 2;
- optional int64 numNulls = 3;
- optional bytes minValue = 4;
- optional bytes maxValue = 5;
+ required ColumnProto column = 1;
+ optional int64 numDistVal = 2;
+ optional int64 numNulls = 3;
+ optional bytes minValue = 4;
+ optional bytes maxValue = 5;
}
enum StatType {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
index 936e8c1..1ede050 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
@@ -22,7 +22,6 @@ import org.apache.tajo.catalog.function.GeneralFunction;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionDescProto;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
@@ -45,8 +44,8 @@ public class TestFunctionDesc {
private Integer y;
public TestSum() {
- super(new Column[] { new Column("arg1", TajoDataTypes.Type.INT4),
- new Column("arg2", TajoDataTypes.Type.INT4) });
+ super(new Column[] { new Column("arg1", org.apache.tajo.common.TajoDataTypes.Type.INT4),
+ new Column("arg2", org.apache.tajo.common.TajoDataTypes.Type.INT4) });
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index c3d0d4a..e82afc8 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -134,7 +134,7 @@ public class CatalogServer extends AbstractService {
this.rpcServer.start();
this.bindAddress = this.rpcServer.getBindAddress();
- this.serverName = org.apache.tajo.util.NetUtils.getIpPortString(bindAddress);
+ this.serverName = NetUtils.getIpPortString(bindAddress);
conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
} catch (Exception e) {
LOG.error("Cannot start RPC Server of CatalogServer", e);
@@ -145,7 +145,9 @@ public class CatalogServer extends AbstractService {
}
public void stop() {
- this.rpcServer.shutdown();
+ if (rpcServer != null) {
+ this.rpcServer.shutdown();
+ }
LOG.info("Catalog Server (" + serverName + ") shutdown");
super.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index 4394d74..5dbbaca 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -18,7 +18,9 @@
package org.apache.tajo;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
@@ -112,7 +114,7 @@ public class QueryId implements Comparable<QueryId> {
return new ApplicationIdPBImpl(p);
}
- private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ public static ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5ad16a2..fb7c268 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -66,6 +66,7 @@ public class TajoConf extends YarnConfiguration {
// Service Addresses
TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"), // used internally
CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
+ QUERY_MASTER_MANAGER_SERVICE_ADDRESS("tajo.master.querymastermanager.addr", "127.0.0.1:9005"),
//////////////////////////////////
// Catalog Configuration
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index 64ba1b8..33dd9d9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -133,7 +133,7 @@ public class Bytes {
* Read byte-array written with a WritableableUtils.vint prefix.
* @param in Input to read from.
* @return byte array read off <code>in</code>
- * @throws IOException e
+ * @throws java.io.IOException e
*/
public static byte [] readByteArray(final DataInput in)
throws IOException {
@@ -164,7 +164,7 @@ public class Bytes {
* Write byte-array with a WritableableUtils.vint prefix.
* @param out output stream to be written to
* @param b array to write
- * @throws IOException e
+ * @throws java.io.IOException e
*/
public static void writeByteArray(final DataOutput out, final byte [] b)
throws IOException {
@@ -181,7 +181,7 @@ public class Bytes {
* @param b array
* @param offset offset into array
* @param length length past offset
- * @throws IOException e
+ * @throws java.io.IOException e
*/
public static void writeByteArray(final DataOutput out, final byte [] b,
final int offset, final int length)
@@ -975,7 +975,7 @@ public class Bytes {
interface Comparer<T> {
abstract public int compareTo(T buffer1, int offset1, int length1,
- T buffer2, int offset2, int length2);
+ T buffer2, int offset2, int length2);
}
@VisibleForTesting
@@ -985,7 +985,7 @@ public class Bytes {
/**
* Provides a lexicographical comparer implementation; either a Java
- * implementation or a faster implementation based on {@link Unsafe}.
+ * implementation or a faster implementation based on {@link sun.misc.Unsafe}.
*
* <p>Uses reflection to gracefully fall back to the Java implementation if
* {@code Unsafe} isn't available.
@@ -1224,7 +1224,7 @@ public class Bytes {
/**
* @param b bytes to hash
- * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
+ * @return Runs {@link org.apache.hadoop.io.WritableComparator#hashBytes(byte[], int)} on the
* passed in array. This method is what {@link org.apache.hadoop.io.Text} and
* {@link ImmutableBytesWritable} use calculating hash code.
*/
@@ -1235,7 +1235,7 @@ public class Bytes {
/**
* @param b value
* @param length length of the value
- * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
+ * @return Runs {@link org.apache.hadoop.io.WritableComparator#hashBytes(byte[], int)} on the
* passed in array. This method is what {@link org.apache.hadoop.io.Text} and
* {@link ImmutableBytesWritable} use calculating hash code.
*/
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
index 7ff1c1f..f045197 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
@@ -30,7 +30,7 @@ public class CommonTestingUtil {
*
* @param dir a local directory to be created
* @return the created path
- * @throws IOException
+ * @throws java.io.IOException
*/
public static Path getTestDir(String dir) throws IOException {
Path path = new Path(dir);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
index 2164d27..9dfbfbc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
@@ -21,11 +21,20 @@ package org.apache.tajo.util;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.tajo.QueryId;
import org.apache.tajo.SubQueryId;
import org.apache.tajo.TajoIdProtos.SubQueryIdProto;
+import java.util.Iterator;
+
+import static org.apache.hadoop.yarn.util.StringHelper._split;
+
public class TajoIdUtils {
+ public static final String YARN_APPLICATION_PREFIX = "application";
+ public static final String YARN_CONTAINER_PREFIX = "container";
+ public static final String YARN_APPLICATION_ATTEMPT_PREFIX = "appattempt";
+
/** It is mainly for DDL statements which don's have any query id. */
public static final QueryId NullQueryId =
TajoIdUtils.createQueryId(BuilderUtils.newApplicationId(0, 0), 0);
@@ -45,7 +54,7 @@ public class TajoIdUtils {
String[] split = queryId.split(QueryId.SEPARATOR);
ApplicationId appId = BuilderUtils.newApplicationId(Long.valueOf(split[1]),
Integer.parseInt(split[2]));
- int idInt = Integer.parseInt(split[2]);
+ int idInt = Integer.parseInt(split[3]);
return newQueryId(appId, idInt);
}
@@ -80,4 +89,33 @@ public class TajoIdUtils {
SubQueryId subId = new SubQueryId(proto);
return subId;
}
+
+ public static ApplicationAttemptId toApplicationAttemptId(
+ String applicationAttmeptIdStr) {
+ //This methood from YARN.ConvertUtils
+ Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
+ if (!it.next().equals(YARN_APPLICATION_ATTEMPT_PREFIX)) {
+ throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+ + applicationAttmeptIdStr);
+ }
+ try {
+ return toApplicationAttemptId(it);
+ } catch (NumberFormatException n) {
+ throw new IllegalArgumentException("Invalid AppAttemptId: "
+ + applicationAttmeptIdStr, n);
+ }
+ }
+
+ private static ApplicationAttemptId toApplicationAttemptId(
+ Iterator<String> it) throws NumberFormatException {
+ //This methood from YARN.ConvertUtils
+ ApplicationId appId = Records.newRecord(ApplicationId.class);
+ appId.setClusterTimestamp(Long.parseLong(it.next()));
+ appId.setId(Integer.parseInt(it.next()));
+ ApplicationAttemptId appAttemptId = Records
+ .newRecord(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(appId);
+ appAttemptId.setAttemptId(Integer.parseInt(it.next()));
+ return appAttemptId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 99bb666..541b4d4 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -173,9 +173,11 @@
<argument>-Isrc/main/proto/</argument>
<argument>--java_out=target/generated-sources/proto</argument>
<argument>src/main/proto/tajo_protos.proto</argument>
- <argument>src/main/proto/MasterWorkerProtos.proto</argument>
- <argument>src/main/proto/MasterWorkerProtocol.proto</argument>
- <argument>src/main/proto/ClientProtocol.proto</argument>
+ <argument>src/main/proto/ClientProtos.proto</argument>
+ <argument>src/main/proto/QueryMasterClientProtocol.proto</argument>
+ <argument>src/main/proto/QueryMasterManagerProtocol.proto</argument>
+ <argument>src/main/proto/QueryMasterProtocol.proto</argument>
+ <argument>src/main/proto/TajoMasterClientProtocol.proto</argument>
</arguments>
</configuration>
<goals>
@@ -365,10 +367,17 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
+ <!--
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
+ -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
@@ -407,6 +416,14 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ <configuration>
+ <aggregate>true</aggregate>
+ </configuration>
+ </plugin>
</plugins>
</build>
</profile>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index 42717d0..1664b1c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -52,8 +52,17 @@ public abstract class BenchmarkSet {
}
protected void loadQueries(String dir) throws IOException {
+ // TODO - this code dead??
File queryDir = new File(dir);
+ if(!queryDir.exists()) {
+ queryDir = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/" + dir);
+ }
+
+ if(!queryDir.exists())
+ {
+ return;
+ }
int last;
String name, query;
for (String file : queryDir.list()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index ce4c72f..86d47f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -28,12 +28,11 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.ClientProtocol;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.master.cluster.ServerName;
+import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.TajoIdUtils;
@@ -265,7 +264,6 @@ public class TajoCli {
}
public int executeStatements(String line) throws Exception {
-
String stripped;
for (String statement : line.split(";")) {
stripped = StringUtils.chomp(statement);
@@ -284,14 +282,20 @@ public class TajoCli {
invokeCommand(cmds);
} else { // submit a query to TajoMaster
- ClientProtocol.SubmitQueryRespose response = client.executeQuery(stripped);
-
- if (response.getResultCode() == ClientProtocol.ResultCode.OK) {
- QueryId queryId = new QueryId(response.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
- sout.println("OK");
- } else {
- getQueryResult(queryId);
+ ClientProtos.SubmitQueryResponse response = client.executeQuery(stripped);
+ if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ QueryId queryId = null;
+ try {
+ queryId = new QueryId(response.getQueryId());
+ if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ sout.println("OK");
+ } else {
+ getQueryResult(queryId);
+ }
+ } finally {
+ if(queryId != null) {
+ client.closeQuery(queryId);
+ }
}
} else {
if (response.hasErrorMessage()) {
@@ -318,8 +322,13 @@ public class TajoCli {
QueryStatus status;
while (true) {
+ // TODO - configurable
Thread.sleep(1000);
status = client.getQueryStatus(queryId);
+ if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) {
+ continue;
+ }
+
if (status.getState() == QueryState.QUERY_RUNNING ||
status.getState() == QueryState.QUERY_SUCCEEDED) {
sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
@@ -328,7 +337,7 @@ public class TajoCli {
sout.flush();
}
- if (status.getState() != QueryState.QUERY_RUNNING) {
+ if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
index cdde9de..c7122b3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
@@ -20,7 +20,7 @@ package org.apache.tajo.client;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.client.ClientProtocol.GetQueryStatusResponse;
+import org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
public class QueryStatus {
private QueryId queryId;
@@ -31,6 +31,8 @@ public class QueryStatus {
private long finishTime;
private boolean hasResult;
private String errorText;
+ private String queryMasterHost;
+ private int queryMasterPort;
public QueryStatus(GetQueryStatusResponse proto) {
queryId = new QueryId(proto.getQueryId());
@@ -43,6 +45,17 @@ public class QueryStatus {
if (proto.hasErrorMessage()) {
errorText = proto.getErrorMessage();
}
+
+ queryMasterHost = proto.getQueryMasterHost();
+ queryMasterPort = proto.getQueryMasterPort();
+ }
+
+ public String getQueryMasterHost() {
+ return queryMasterHost;
+ }
+
+ public int getQueryMasterPort() {
+ return queryMasterPort;
}
public QueryId getQueryId() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 8317065..5b4b064 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -28,10 +28,15 @@ import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.client.ClientProtocol.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.ResultSetImpl;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.*;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.QueryMasterClientProtocol.*;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.rpc.ProtoBlockingRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.TajoIdUtils;
@@ -39,14 +44,25 @@ import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+/**
+ * TajoClient is ThreadSafe
+ */
public class TajoClient {
private final Log LOG = LogFactory.getLog(TajoClient.class);
private final TajoConf conf;
- private ProtoBlockingRpcClient client;
- private ClientProtocolService.BlockingInterface service;
+ private ProtoBlockingRpcClient tasjoMasterClient;
+ private TajoMasterClientProtocolService.BlockingInterface tajoMasterService;
+
+ private Map<QueryId, QueryMasterClientProtocolService.BlockingInterface> queryMasterConnectionMap =
+ new HashMap<QueryId, QueryMasterClientProtocolService.BlockingInterface>();
+
+ private Map<QueryId, ProtoBlockingRpcClient> queryMasterClientMap =
+ new HashMap<QueryId, ProtoBlockingRpcClient>();
public TajoClient(TajoConf conf) throws IOException {
this.conf = conf;
@@ -67,8 +83,8 @@ public class TajoClient {
private void connect(InetSocketAddress addr) throws IOException {
try {
- client = new ProtoBlockingRpcClient(ClientProtocol.class, addr);
- service = client.getStub();
+ tasjoMasterClient = new ProtoBlockingRpcClient(TajoMasterClientProtocol.class, addr);
+ tajoMasterService = tasjoMasterClient.getStub();
} catch (Exception e) {
throw new IOException(e);
}
@@ -78,11 +94,31 @@ public class TajoClient {
}
public void close() {
- client.close();
+ tasjoMasterClient.close();
+
+ for(ProtoBlockingRpcClient eachClient: queryMasterClientMap.values()) {
+ eachClient.close();
+ }
+ queryMasterClientMap.clear();
+ queryMasterConnectionMap.clear();
+ }
+
+ public void closeQuery(QueryId queryId) {
+ if(queryMasterClientMap.containsKey(queryId)) {
+ try {
+ queryMasterConnectionMap.get(queryId).killQuery(null, queryId.getProto());
+ } catch (Exception e) {
+ LOG.warn("Fail to close query:" + queryId + "," + e.getMessage(), e);
+ }
+ queryMasterClientMap.get(queryId).close();
+ LOG.info("Closed QueryMaster connection(" + queryId + "," + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
+ queryMasterClientMap.remove(queryId);
+ queryMasterConnectionMap.remove(queryId);
+ }
}
public boolean isConnected() {
- return client.isConnected();
+ return tasjoMasterClient.isConnected();
}
/**
@@ -91,11 +127,11 @@ public class TajoClient {
* In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
* or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
*/
- public SubmitQueryRespose executeQuery(String tql) throws ServiceException {
+ public ClientProtos.SubmitQueryResponse executeQuery(String tql) throws ServiceException {
QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(tql);
- return service.submitQuery(null, builder.build());
+ return tajoMasterService.submitQuery(null, builder.build());
}
/**
@@ -110,15 +146,12 @@ public class TajoClient {
throws ServiceException, IOException {
QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(sql);
- SubmitQueryRespose response = service.submitQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
- QueryId queryId = new QueryId(response.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
- return null;
- }
- return getQueryResultAndWait(queryId);
+ SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+ QueryId queryId = new QueryId(response.getQueryId());
+ if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ return this.createNullResultSet(queryId);
} else {
- throw new ServiceException(response.getErrorMessage());
+ return this.getQueryResultAndWait(queryId);
}
}
@@ -127,38 +160,69 @@ public class TajoClient {
= GetQueryStatusRequest.newBuilder();
builder.setQueryId(queryId.getProto());
- GetQueryStatusResponse res = service.getQueryStatus(null,
- builder.build());
+ GetQueryStatusResponse res = null;
+ if(queryMasterConnectionMap.containsKey(queryId)) {
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+ res = queryMasterService.getQueryStatus(null, builder.build());
+ } else {
+ res = tajoMasterService.getQueryStatus(null, builder.build());
+ String queryMasterHost = res.getQueryMasterHost();
+ if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
+ LOG.info("=========> connect to querymaster:" + queryMasterHost);
+ connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
+ }
+ }
return new QueryStatus(res);
}
+ private void connectionToQueryMaster(QueryId queryId, String queryMasterHost, int queryMasterPort) {
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterHost, queryMasterPort);
+ ProtoBlockingRpcClient client = new ProtoBlockingRpcClient(QueryMasterClientProtocol.class, addr);
+ QueryMasterClientProtocolService.BlockingInterface service = client.getStub();
+
+ queryMasterConnectionMap.put(queryId, service);
+ queryMasterClientMap.put(queryId, client);
+
+ LOG.debug("connected to Query Master (" +
+ org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
private static boolean isQueryRunnning(QueryState state) {
return state == QueryState.QUERY_NEW ||
state == QueryState.QUERY_INIT ||
- state == QueryState.QUERY_RUNNING;
+ state == QueryState.QUERY_RUNNING ||
+ state == QueryState.QUERY_MASTER_LAUNCHED ||
+ state == QueryState.QUERY_MASTER_INIT ||
+ state == QueryState.QUERY_NOT_ASSIGNED;
}
public ResultSet getQueryResult(QueryId queryId)
throws ServiceException, IOException {
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
- return null;
- }
+ if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ return createNullResultSet(queryId);
+ }
TableDesc tableDesc = getResultDesc(queryId);
- return new ResultSetImpl(conf, tableDesc.getPath());
+ return new ResultSetImpl(this, queryId, conf, tableDesc.getPath());
}
public ResultSet getQueryResultAndWait(QueryId queryId)
throws ServiceException, IOException {
if (queryId.equals(TajoIdUtils.NullQueryId)) {
- return null;
+ return createNullResultSet(queryId);
}
QueryStatus status = getQueryStatus(queryId);
while(status != null && isQueryRunnning(status.getState())) {
try {
- Thread.sleep(500);
+// Thread.sleep(500);
+ Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -170,24 +234,34 @@ public class TajoClient {
if (status.hasResult()) {
return getQueryResult(queryId);
} else {
- return null;
+ return createNullResultSet(queryId);
}
} else {
- LOG.error(status.getErrorMessage());
+ LOG.warn("=====>Query failed:" + status.getState());
- return null;
+ //TODO throw SQLException(?)
+ return createNullResultSet(queryId);
}
}
+ public ResultSet createNullResultSet(QueryId queryId) throws IOException {
+ return new ResultSetImpl(this, queryId);
+ }
+
public TableDesc getResultDesc(QueryId queryId) throws ServiceException {
if (queryId.equals(TajoIdUtils.NullQueryId)) {
return null;
}
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+ if(queryMasterService == null) {
+ LOG.warn("No Connection to QueryMaster for " + queryId);
+ return null;
+ }
GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
builder.setQueryId(queryId.getProto());
- GetQueryResultResponse response = service.getQueryResult(null,
+ GetQueryResultResponse response = queryMasterService.getQueryResult(null,
builder.build());
return CatalogUtil.newTableDesc(response.getTableDesc());
@@ -198,14 +272,14 @@ public class TajoClient {
builder.setQuery(tql);
ResultCode resultCode =
- service.updateQuery(null, builder.build()).getResultCode();
+ tajoMasterService.updateQuery(null, builder.build()).getResultCode();
return resultCode == ResultCode.OK;
}
public boolean existTable(String name) throws ServiceException {
StringProto.Builder builder = StringProto.newBuilder();
builder.setValue(name);
- return service.existTable(null, builder.build()).getValue();
+ return tajoMasterService.existTable(null, builder.build()).getValue();
}
public TableDesc attachTable(String name, String path)
@@ -213,7 +287,7 @@ public class TajoClient {
AttachTableRequest.Builder builder = AttachTableRequest.newBuilder();
builder.setName(name);
builder.setPath(path);
- TableResponse res = service.attachTable(null, builder.build());
+ TableResponse res = tajoMasterService.attachTable(null, builder.build());
return CatalogUtil.newTableDesc(res.getTableDesc());
}
@@ -225,7 +299,7 @@ public class TajoClient {
public boolean detachTable(String name) throws ServiceException {
StringProto.Builder builder = StringProto.newBuilder();
builder.setValue(name);
- return service.detachTable(null, builder.build()).getValue();
+ return tajoMasterService.detachTable(null, builder.build()).getValue();
}
public TableDesc createTable(String name, Path path, TableMeta meta)
@@ -234,14 +308,14 @@ public class TajoClient {
builder.setName(name);
builder.setPath(path.toString());
builder.setMeta(meta.getProto());
- TableResponse res = service.createTable(null, builder.build());
+ TableResponse res = tajoMasterService.createTable(null, builder.build());
return CatalogUtil.newTableDesc(res.getTableDesc());
}
public boolean dropTable(String name) throws ServiceException {
StringProto.Builder builder = StringProto.newBuilder();
builder.setValue(name);
- return service.dropTable(null, builder.build()).getValue();
+ return tajoMasterService.dropTable(null, builder.build()).getValue();
}
/**
@@ -250,14 +324,14 @@ public class TajoClient {
*/
public List<String> getTableList() throws ServiceException {
GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
- GetTableListResponse res = service.getTableList(null, builder.build());
+ GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
return res.getTablesList();
}
public TableDesc getTableDesc(String tableName) throws ServiceException {
GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
build.setTableName(tableName);
- TableResponse res = service.getTableDesc(null, build.build());
+ TableResponse res = tajoMasterService.getTableDesc(null, build.build());
if (res == null) {
return null;
} else {
@@ -272,7 +346,7 @@ public class TajoClient {
try {
/* send a kill to the TM */
- service.killQuery(null, queryId.getProto());
+ tajoMasterService.killQuery(null, queryId.getProto());
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
@@ -293,4 +367,14 @@ public class TajoClient {
return true;
}
+
+ public static void main(String[] args) throws Exception {
+ TajoClient client = new TajoClient(new TajoConf());
+
+ client.close();
+
+ synchronized(client) {
+ client.wait();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index bde2df5..973de99 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -98,7 +98,7 @@ public class EvalTreeUtil {
found.add(node);
}
}
-
+
public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets)
throws InternalException {
Schema schema = new Schema();
@@ -282,7 +282,7 @@ public class EvalTreeUtil {
}
public static class VariableCounter implements EvalNodeVisitor {
- private final Map<EvalNode.Type, Integer> counter;
+ private final Map<Type, Integer> counter;
public VariableCounter() {
counter = Maps.newHashMap();
@@ -299,7 +299,7 @@ public class EvalTreeUtil {
}
}
- public Map<EvalNode.Type, Integer> getCounter() {
+ public Map<Type, Integer> getCounter() {
return counter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
index 04c1bb9..0e3a174 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
@@ -41,6 +41,10 @@ public class FromTable implements Cloneable, GsonObject {
this.alias = alias;
}
+ public TableDesc getTableDesc() {
+ return this.desc;
+ }
+
public final String getTableName() {
return desc.getId();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index bb7caa1..7d430c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.query;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.Fetch;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProtoOrBuilder;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.storage.Fragment;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index db049a0..e0db8e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -24,9 +24,11 @@ package org.apache.tajo.engine.query;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.tajo.QueryId;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.TableMetaImpl;
import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
+import org.apache.tajo.client.TajoClient;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnsupportedException;
@@ -56,27 +58,36 @@ public class ResultSetImpl implements ResultSet {
private int curRow;
private long totalRow;
private boolean wasNull;
+ private TajoClient tajoClient;
+ QueryId queryId;
- public ResultSetImpl(Configuration conf, String path) throws IOException {
- this(conf, new Path(path));
+ public ResultSetImpl(TajoClient tajoClient, QueryId queryId) throws IOException {
+ this(tajoClient, queryId, null, null);
}
- public ResultSetImpl(Configuration conf, Path path) throws IOException {
+// public ResultSetImpl(TajoClient tajoClient, QueryId queryId, Configuration conf, String path) throws IOException {
+// this(tajoClient, queryId, conf, new Path(path));
+// }
+
+ public ResultSetImpl(TajoClient tajoClient, QueryId queryId, Configuration conf, Path path) throws IOException {
+ this.tajoClient = tajoClient;
+ this.queryId = queryId;
this.conf = conf;
- this.fs = path.getFileSystem(this.conf);
- // TODO - to be improved. It can be solved to get the query finish status
- // from master.
- try {
- this.meta = getMeta(this.conf, path);
- } catch (FileNotFoundException fnf) {
- this.totalRow = 0;
- init();
- return;
+ if(path != null) {
+ this.fs = path.getFileSystem(this.conf);
+ // TODO - to be improved. It can be solved to get the query finish status
+ // from master.
+ try {
+ this.meta = getMeta(this.conf, path);
+ } catch (FileNotFoundException fnf) {
+ this.totalRow = 0;
+ init();
+ return;
+ }
+ this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
+ Collection<Fragment> frags = getFragmentsNG(meta, path);
+ scanner = new MergeScanner(conf, meta, frags);
}
- this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
- Collection<Fragment> frags = getFragmentsNG(meta, path);
- scanner = new MergeScanner(conf, meta, frags);
- scanner.init();
init();
}
@@ -194,7 +205,9 @@ public class ResultSetImpl implements ResultSet {
@Override
public void beforeFirst() throws SQLException {
try {
- scanner.reset();
+ if(scanner != null) {
+ scanner.reset();
+ }
init();
} catch (IOException e) {
e.printStackTrace();
@@ -230,7 +243,17 @@ public class ResultSetImpl implements ResultSet {
@Override
public void close() throws SQLException {
try {
- this.scanner.close();
+ if(tajoClient != null) {
+ this.tajoClient.closeQuery(queryId);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ if(scanner != null) {
+ this.scanner.close();
+ }
+ //TODO clean temp result file
cur = null;
curRow = -1;
} catch (IOException e) {
@@ -1232,6 +1255,9 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public boolean next() throws SQLException {
+ if(scanner == null) {
+ return false;
+ }
try {
if (totalRow <= 0)
return false;
@@ -2220,4 +2246,8 @@ public class ResultSetImpl implements ResultSet {
private void handleNull(Datum d) {
wasNull = (d instanceof NullDatum);
}
+
+ public boolean hasResult() {
+ return scanner != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index 880110d..bb4008f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -23,14 +23,13 @@ package org.apache.tajo.ipc.protocolrecords;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.engine.MasterWorkerProtos.Fetch;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.storage.Fragment;
import java.net.URI;
import java.util.List;
-public interface QueryUnitRequest extends ProtoObject<QueryUnitRequestProto> {
+public interface QueryUnitRequest extends ProtoObject<QueryMasterProtocol.QueryUnitRequestProto> {
public QueryUnitAttemptId getId();
public List<Fragment> getFragments();
@@ -40,7 +39,7 @@ public interface QueryUnitRequest extends ProtoObject<QueryUnitRequestProto> {
public boolean isInterQuery();
public void setInterQuery();
public void addFetch(String name, URI uri);
- public List<Fetch> getFetches();
+ public List<QueryMasterProtocol.Fetch> getFetches();
public boolean shouldDie();
public void setShouldDie();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
deleted file mode 100644
index c980dcf..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
-import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.client.ClientProtocol;
-import org.apache.tajo.client.ClientProtocol.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-
-public class ClientService extends AbstractService {
- private final static Log LOG = LogFactory.getLog(ClientService.class);
- private final MasterContext context;
- private final TajoConf conf;
- private final CatalogService catalog;
- private final ClientProtocolHandler clientHandler;
- private ProtoBlockingRpcServer server;
- private InetSocketAddress bindAddress;
-
- private final BoolProto BOOL_TRUE =
- BoolProto.newBuilder().setValue(true).build();
- private final BoolProto BOOL_FALSE =
- BoolProto.newBuilder().setValue(false).build();
-
- public ClientService(MasterContext context) {
- super(ClientService.class.getName());
- this.context = context;
- this.conf = context.getConf();
- this.catalog = context.getCatalog();
- this.clientHandler = new ClientProtocolHandler();
- }
-
- @Override
- public void start() {
-
- // start the rpc server
- String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
- try {
- server = new ProtoBlockingRpcServer(ClientProtocol.class, clientHandler, initIsa);
- } catch (Exception e) {
- LOG.error(e);
- }
- server.start();
- bindAddress = server.getBindAddress();
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
- org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
- LOG.info("Instantiated ClientService at " + this.bindAddress);
- super.start();
- }
-
- @Override
- public void stop() {
- server.shutdown();
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddress;
- }
-
- public int getHttpPort() {
- return 0;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // ClientService
- /////////////////////////////////////////////////////////////////////////////
-
- public class ClientProtocolHandler implements ClientProtocolService.BlockingInterface {
- @Override
- public BoolProto updateSessionVariables(RpcController controller,
- UpdateSessionVariableRequest request)
- throws ServiceException {
- return null;
- }
-
- @Override
- public SubmitQueryRespose submitQuery(RpcController controller,
- QueryRequest request)
- throws ServiceException {
-
- QueryId queryId;
- SubmitQueryRespose.Builder build = SubmitQueryRespose.newBuilder();
- try {
- queryId = context.getGlobalEngine().executeQuery(request.getQuery());
- } catch (SQLSyntaxError e) {
- build.setResultCode(ResultCode.ERROR);
- build.setErrorMessage(e.getMessage());
- return build.build();
-
- } catch (Exception e) {
- build.setResultCode(ResultCode.ERROR);
- String msg = e.getMessage();
- if (msg == null) {
- msg = "Internal Error";
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.error(msg, e);
- } else {
- LOG.error(msg);
- }
- build.setErrorMessage(msg);
- return build.build();
- }
-
- LOG.info("Query " + queryId + " is submitted");
- build.setResultCode(ResultCode.OK);
- build.setQueryId(queryId.getProto());
-
- return build.build();
- }
-
- @Override
- public UpdateQueryResponse updateQuery(RpcController controller,
- QueryRequest request)
- throws ServiceException {
-
- UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
- try {
- context.getGlobalEngine().updateQuery(request.getQuery());
- builder.setResultCode(ResultCode.OK);
- return builder.build();
- } catch (Exception e) {
- builder.setResultCode(ResultCode.ERROR);
- if (e.getMessage() == null) {
- builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
- }
- return builder.build();
- }
- }
-
- @Override
- public GetQueryResultResponse getQueryResult(RpcController controller,
- GetQueryResultRequest request)
- throws ServiceException {
- QueryId queryId = new QueryId(request.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
-
- }
- Query query = context.getQuery(queryId).getContext().getQuery();
-
- GetQueryResultResponse.Builder builder
- = GetQueryResultResponse.newBuilder();
- switch (query.getState()) {
- case QUERY_SUCCEEDED:
- builder.setTableDesc((TableDescProto) query.getResultDesc().getProto());
- break;
- case QUERY_FAILED:
- case QUERY_ERROR:
- builder.setErrorMessage("Query " + queryId + " is failed");
- default:
- builder.setErrorMessage("Query " + queryId + " is still running");
- }
-
- return builder.build();
- }
-
- @Override
- public GetQueryListResponse getQueryList(RpcController controller,
- GetQueryListRequest request)
- throws ServiceException {
- return null;
- }
-
- @Override
- public GetQueryStatusResponse getQueryStatus(RpcController controller,
- GetQueryStatusRequest request)
- throws ServiceException {
-
- GetQueryStatusResponse.Builder builder
- = GetQueryStatusResponse.newBuilder();
- QueryId queryId = new QueryId(request.getQueryId());
- builder.setQueryId(request.getQueryId());
-
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
- builder.setResultCode(ResultCode.OK);
- builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
- } else {
- Query query = context.getQuery(queryId).getContext().getQuery();
- if (query != null) {
- builder.setResultCode(ResultCode.OK);
- builder.setState(query.getState());
- builder.setProgress(query.getProgress());
- builder.setSubmitTime(query.getAppSubmitTime());
- builder.setInitTime(query.getInitializationTime());
- builder.setHasResult(!query.isCreateTableStmt());
- if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(query.getFinishTime());
- } else {
- builder.setFinishTime(System.currentTimeMillis());
- }
- } else {
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage("No such query: " + queryId.toString());
- }
- }
-
- return builder.build();
- }
-
- @Override
- public BoolProto killQuery(RpcController controller,
- ApplicationAttemptIdProto request)
- throws ServiceException {
- QueryId queryId = new QueryId(request);
- QueryMaster query = context.getQuery(queryId);
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
- return BOOL_TRUE;
- }
-
- @Override
- public GetClusterInfoResponse getClusterInfo(RpcController controller,
- GetClusterInfoRequest request)
- throws ServiceException {
- return null;
- }
-
- @Override
- public BoolProto existTable(RpcController controller,
- StringProto tableNameProto)
- throws ServiceException {
- String tableName = tableNameProto.getValue();
- if (catalog.existsTable(tableName)) {
- return BOOL_TRUE;
- } else {
- return BOOL_FALSE;
- }
- }
-
- @Override
- public GetTableListResponse getTableList(RpcController controller,
- GetTableListRequest request)
- throws ServiceException {
- Collection<String> tableNames = catalog.getAllTableNames();
- GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
- builder.addAllTables(tableNames);
- return builder.build();
- }
-
- @Override
- public TableResponse getTableDesc(RpcController controller,
- GetTableDescRequest request)
- throws ServiceException {
- String name = request.getTableName();
- if (catalog.existsTable(name)) {
- return TableResponse.newBuilder()
- .setTableDesc((TableDescProto) catalog.getTableDesc(name).getProto())
- .build();
- } else {
- return null;
- }
- }
-
- @Override
- public TableResponse createTable(RpcController controller, CreateTableRequest request)
- throws ServiceException {
- Path path = new Path(request.getPath());
- TableMeta meta = new TableMetaImpl(request.getMeta());
- TableDesc desc;
- try {
- desc = context.getGlobalEngine().createTable(request.getName(), meta, path);
- } catch (Exception e) {
- return TableResponse.newBuilder().setErrorMessage(e.getMessage()).build();
- }
-
- return TableResponse.newBuilder().setTableDesc((TableDescProto) desc.getProto()).build();
- }
-
- @Override
- public BoolProto dropTable(RpcController controller,
- StringProto tableNameProto)
- throws ServiceException {
- context.getGlobalEngine().dropTable(tableNameProto.getValue());
- return BOOL_TRUE;
- }
-
- @Override
- public TableResponse attachTable(RpcController controller,
- AttachTableRequest request)
- throws ServiceException {
-
- TableDesc desc;
- if (catalog.existsTable(request.getName())) {
- throw new AlreadyExistsTableException(request.getName());
- }
-
- Path path = new Path(request.getPath());
-
- LOG.info(path.toUri());
-
- TableMeta meta;
- try {
- meta = TableUtil.getTableMeta(conf, path);
- } catch (IOException e) {
- throw new RemoteException(e);
- }
-
- FileSystem fs;
-
- // for legacy table structure
- Path tablePath = new Path(path, "data");
- try {
- fs = path.getFileSystem(conf);
- if (!fs.exists(tablePath)) {
- tablePath = path;
- }
- } catch (IOException e) {
- LOG.error(e);
- return null;
- }
-
- if (meta.getStat() == null) {
- long totalSize = 0;
- try {
- totalSize = calculateSize(tablePath);
- } catch (IOException e) {
- LOG.error("Cannot calculate the size of the relation", e);
- return null;
- }
-
- meta = new TableMetaImpl(meta.getProto());
- TableStat stat = new TableStat();
- stat.setNumBytes(totalSize);
- meta.setStat(stat);
- }
-
- desc = new TableDescImpl(request.getName(), meta, path);
- catalog.addTable(desc);
- LOG.info("Table " + desc.getId() + " is attached ("
- + meta.getStat().getNumBytes() + ")");
-
- return TableResponse.newBuilder().
- setTableDesc((TableDescProto) desc.getProto())
- .build();
- }
-
- @Override
- public BoolProto detachTable(RpcController controller,
- StringProto tableNameProto)
- throws ServiceException {
- String tableName = tableNameProto.getValue();
- if (!catalog.existsTable(tableName)) {
- throw new NoSuchTableException(tableName);
- }
-
- catalog.deleteTable(tableName);
-
- LOG.info("Table " + tableName + " is detached");
- return BOOL_TRUE;
- }
- }
-
- private long calculateSize(Path path) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- long totalSize = 0;
- for (FileStatus status : fs.listStatus(path)) {
- totalSize += status.getLen();
- }
-
- return totalSize;
- }
-}