You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:17 UTC
[8/8] git commit: TAJO-127: Implement Tajo Resource Manager.
(hyoungjunkim via hyunsik)
TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d48f2667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d48f2667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d48f2667
Branch: refs/heads/master
Commit: d48f2667b822564260b6e5c9705a41e5dcd1c4fc
Parents: 8b8b668
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Aug 26 21:18:23 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Aug 26 21:21:15 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
pom.xml | 9 +
.../org/apache/tajo/catalog/CatalogServer.java | 4 +-
.../java/org/apache/tajo/ExecutionBlockId.java | 97 +++
.../src/main/java/org/apache/tajo/QueryId.java | 180 +----
.../java/org/apache/tajo/QueryIdFactory.java | 96 ++-
.../org/apache/tajo/QueryUnitAttemptId.java | 144 +---
.../main/java/org/apache/tajo/QueryUnitId.java | 162 +---
.../main/java/org/apache/tajo/SubQueryId.java | 166 ----
.../java/org/apache/tajo/conf/TajoConf.java | 3 +-
.../main/java/org/apache/tajo/util/TUtil.java | 6 +-
.../java/org/apache/tajo/util/TajoIdUtils.java | 100 +--
tajo-common/src/main/proto/TajoIdProtos.proto | 11 +-
.../org/apache/tajo/datum/TestBitDatum.java | 2 +-
tajo-core/tajo-core-backend/pom.xml | 5 +-
.../src/main/java/log4j.properties | 3 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 15 +-
.../java/org/apache/tajo/client/TajoClient.java | 24 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 4 +-
.../apache/tajo/engine/query/ResultSetImpl.java | 41 +-
.../ipc/protocolrecords/QueryUnitRequest.java | 6 +-
.../org/apache/tajo/master/ContainerProxy.java | 409 +---------
.../org/apache/tajo/master/ExecutionBlock.java | 12 +-
.../org/apache/tajo/master/GlobalEngine.java | 244 ++----
.../org/apache/tajo/master/GlobalPlanner.java | 38 +-
.../apache/tajo/master/TajoAsyncDispatcher.java | 234 ++++++
.../apache/tajo/master/TajoContainerProxy.java | 163 ++++
.../java/org/apache/tajo/master/TajoMaster.java | 118 +--
.../tajo/master/TajoMasterClientService.java | 101 ++-
.../apache/tajo/master/TajoMasterService.java | 170 ++++
.../tajo/master/TaskRunnerGroupEvent.java | 12 +-
.../tajo/master/TaskRunnerLauncherImpl.java | 171 ----
.../apache/tajo/master/TaskSchedulerImpl.java | 71 +-
.../apache/tajo/master/YarnContainerProxy.java | 446 ++++++++++
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 208 +++++
.../master/event/ContainerAllocationEvent.java | 20 +-
.../event/GrouppedContainerAllocatorEvent.java | 6 +-
.../tajo/master/event/QueryStartEvent.java | 50 ++
.../tajo/master/event/QuerySubQueryEvent.java | 12 +-
.../master/event/SubQueryCompletedEvent.java | 14 +-
.../event/SubQueryContainerAllocationEvent.java | 4 +-
.../apache/tajo/master/event/SubQueryEvent.java | 8 +-
.../tajo/master/event/SubQuerySucceeEvent.java | 4 +-
.../tajo/master/event/SubQueryTaskEvent.java | 2 +-
.../event/TaskAttemptStatusUpdateEvent.java | 2 +-
.../tajo/master/event/TaskCompletionEvent.java | 2 +-
.../tajo/master/event/TaskFatalErrorEvent.java | 2 +-
.../tajo/master/event/TaskRequestEvent.java | 11 +-
.../tajo/master/event/TaskScheduleEvent.java | 2 +-
.../tajo/master/event/TaskSchedulerEvent.java | 12 +-
.../apache/tajo/master/querymaster/Query.java | 54 +-
.../master/querymaster/QueryInProgress.java | 285 +++++++
.../tajo/master/querymaster/QueryInfo.java | 127 +++
.../tajo/master/querymaster/QueryJobEvent.java | 43 +
.../master/querymaster/QueryJobManager.java | 172 ++++
.../tajo/master/querymaster/QueryMaster.java | 809 ++++---------------
.../querymaster/QueryMasterClientService.java | 197 -----
.../master/querymaster/QueryMasterManager.java | 353 --------
.../querymaster/QueryMasterManagerService.java | 116 ---
.../master/querymaster/QueryMasterRunner.java | 13 +-
.../master/querymaster/QueryMasterTask.java | 445 ++++++++++
.../tajo/master/querymaster/QueryUnit.java | 12 +-
.../master/querymaster/QueryUnitAttempt.java | 13 +-
.../tajo/master/querymaster/Repartitioner.java | 22 +-
.../tajo/master/querymaster/SubQuery.java | 48 +-
.../tajo/master/rm/RMContainerAllocator.java | 208 -----
.../tajo/master/rm/TajoWorkerContainer.java | 120 +++
.../tajo/master/rm/TajoWorkerContainerId.java | 47 ++
.../master/rm/TajoWorkerResourceManager.java | 394 +++++++++
.../apache/tajo/master/rm/WorkerResource.java | 194 +++++
.../tajo/master/rm/WorkerResourceManager.java | 60 ++
.../org/apache/tajo/master/rm/WorkerStatus.java | 25 +
.../master/rm/YarnRMContainerAllocator.java | 236 ++++++
.../tajo/master/rm/YarnTajoResourceManager.java | 337 ++++++++
.../apache/tajo/util/ApplicationIdUtils.java | 41 +
.../apache/tajo/webapp/StaticHttpServer.java | 26 +-
.../tajo/worker/AbstractResourceAllocator.java | 79 ++
.../apache/tajo/worker/ResourceAllocator.java | 27 +
.../tajo/worker/TajoResourceAllocator.java | 346 ++++++++
.../java/org/apache/tajo/worker/TajoWorker.java | 424 ++++++++++
.../tajo/worker/TajoWorkerClientService.java | 210 +++++
.../tajo/worker/TajoWorkerManagerService.java | 234 ++++++
.../main/java/org/apache/tajo/worker/Task.java | 43 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 256 +++---
.../apache/tajo/worker/TaskRunnerManager.java | 108 +++
.../tajo/worker/YarnResourceAllocator.java | 106 +++
.../retriever/AdvancedDataRetriever.java | 10 +-
.../src/main/proto/ClientProtocol.proto | 12 +-
.../src/main/proto/ClientProtos.proto | 10 +-
.../main/proto/QueryMasterClientProtocol.proto | 2 +-
.../main/proto/QueryMasterManagerProtocol.proto | 50 --
.../src/main/proto/QueryMasterProtocol.proto | 132 ---
.../src/main/proto/TajoIdProtos.proto | 11 +-
.../main/proto/TajoMasterClientProtocol.proto | 5 +-
.../src/main/proto/TajoMasterProtocol.proto | 98 +++
.../src/main/proto/TajoWorkerProtocol.proto | 137 ++++
.../src/main/resources/log4j.properties | 3 +
.../src/main/resources/tajo-default.xml | 68 ++
.../src/test/java/log4j.properties | 3 +
.../apache/tajo/LocalTajoTestingUtility.java | 13 +-
.../org/apache/tajo/MiniTajoYarnCluster.java | 3 -
.../org/apache/tajo/TajoTestingCluster.java | 120 +--
.../org/apache/tajo/TestQueryIdFactory.java | 7 +-
.../test/java/org/apache/tajo/TestTajoIds.java | 78 +-
.../test/java/org/apache/tajo/TpchTestBase.java | 17 +-
.../plan/global/TestGlobalQueryPlanner.java | 1 -
.../global/TestGlobalQueryOptimizer.java | 15 +-
.../planner/physical/TestPhysicalPlanner.java | 2 -
.../apache/tajo/master/TestRepartitioner.java | 7 +-
.../org/apache/tajo/worker/TaskRunnerTest.java | 18 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 2 -
.../worker/dataserver/TestHttpDataServer.java | 12 +-
.../src/test/resources/tajo-default.xml | 6 +
tajo-core/tajo-core-pullserver/pom.xml | 11 +-
.../tajo/pullserver/HttpDataServerHandler.java | 9 +-
.../tajo/pullserver/PullServerAuxService.java | 15 +-
.../tajo/pullserver/TajoPullServerService.java | 652 +++++++++++++++
.../retriever/AdvancedDataRetriever.java | 23 +-
.../java/org/apache/tajo/storage/CSVFile.java | 187 ++---
.../org/apache/tajo/storage/MergeScanner.java | 8 +-
tajo-dist/src/main/bin/start-tajo.sh | 8 +
tajo-dist/src/main/bin/stop-tajo.sh | 11 +-
tajo-dist/src/main/bin/tajo | 4 +
tajo-dist/src/main/bin/tajo-config.sh | 15 +-
tajo-dist/src/main/bin/tajo-daemon.sh | 12 +-
tajo-dist/src/main/bin/tajo-daemons.sh | 68 ++
tajo-dist/src/main/conf/tajo-env.sh | 13 +-
tajo-dist/src/main/conf/workers | 1 +
.../org/apache/tajo/rpc/NettyServerBase.java | 1 -
.../apache/tajo/rpc/ProtoAsyncRpcClient.java | 5 +-
.../apache/tajo/rpc/ProtoAsyncRpcServer.java | 3 +-
.../apache/tajo/rpc/ProtoBlockingRpcClient.java | 12 +-
.../java/org/apache/tajo/util/NetUtils.java | 2 +-
133 files changed, 7812 insertions(+), 3948 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index de7d746..c6b3f84 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)
+
TAJO-84: Task scheduling with considering disk load balance. (jinho)
TAJO-123: Clean up the logical plan's json format. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e1e7b7d..07b05a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,14 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ <configuration>
+ <aggregate>true</aggregate>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
@@ -297,6 +305,7 @@
<numUnapprovedLicenses>0</numUnapprovedLicenses>
<excludes>
<exclude>CHANGES.txt</exclude>
+ <exclude>**/workers</exclude>
<exclude>**/*.tql</exclude>
<exclude>**/*.sql</exclude>
<exclude>**/*.schema</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index dd36c3e..f0a0584 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -128,7 +128,9 @@ public class CatalogServer extends AbstractService {
// Creation of a HSA will force a resolve.
InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
try {
- this.rpcServer = new ProtoBlockingRpcServer(CatalogProtocol.class, handler, initIsa);
+ this.rpcServer = new ProtoBlockingRpcServer(
+ CatalogProtocol.class,
+ handler, initIsa);
this.rpcServer.start();
this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
new file mode 100644
index 0000000..2dc4441
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/ExecutionBlockId.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+public class ExecutionBlockId implements Comparable<ExecutionBlockId> {
+ public static final String EB_ID_PREFIX = "eb";
+ private QueryId queryId;
+ private int id;
+
+ public ExecutionBlockId(QueryId queryId, int id) {
+ this.queryId = queryId;
+ this.id = id;
+ }
+
+ public ExecutionBlockId(TajoIdProtos.ExecutionBlockIdProto proto) {
+ this(new QueryId(proto.getQueryId()), proto.getId());
+ }
+
+// public ExecutionBlockId(String idStr) {
+// String[] tokens = idStr.split(QueryId.SEPARATOR);
+// if(tokens.length < 3) {
+// throw new RuntimeException("Wrong ExecutionBlockId format[" + idStr + "]");
+// }
+//
+// this.queryId = new QueryId(tokens);
+// this.id = Integer.parseInt(tokens[3]);
+// }
+
+ @Override
+ public String toString() {
+ return EB_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
+ }
+
+ @Override
+ public int compareTo(ExecutionBlockId executionBlockId) {
+ int result = queryId.compareTo(executionBlockId.queryId);
+ if (result == 0) {
+ return id - executionBlockId.id;
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if(!(obj instanceof ExecutionBlockId)) {
+ return false;
+ }
+ return compareTo((ExecutionBlockId)obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public TajoIdProtos.ExecutionBlockIdProto getProto() {
+ return TajoIdProtos.ExecutionBlockIdProto.newBuilder()
+ .setQueryId(queryId.getProto())
+ .setId(id)
+ .build();
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String toStringNoPrefix() {
+ return queryId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.EB_ID_FORMAT.format(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index 5dbbaca..af1ae56 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -18,172 +18,72 @@
package org.apache.tajo;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-
-import java.text.NumberFormat;
-
-/**
- * QueryId represents a unique identifier of a query.
- */
public class QueryId implements Comparable<QueryId> {
- public static final String PREFIX = "q";
- public static final String SEPARATOR = "_";
-
- ApplicationAttemptIdProto proto = ApplicationAttemptIdProto
- .getDefaultInstance();
- ApplicationAttemptIdProto.Builder builder = null;
- boolean viaProto = false;
+ public static String SEPARATOR = "_";
+ public static final String QUERY_ID_PREFIX = "q";
- private ApplicationId applicationId = null;
+ private String id;
+ private int seq;
- public QueryId() {
- builder = ApplicationAttemptIdProto.newBuilder();
+ public QueryId(String id, int seq) {
+ this.id = id;
+ this.seq = seq;
}
- public QueryId(ApplicationAttemptIdProto proto) {
- this.proto = proto;
- viaProto = true;
+ public QueryId(TajoIdProtos.QueryIdProto queryId) {
+ this(queryId.getId(), queryId.getSeq());
}
- public synchronized ApplicationAttemptIdProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
+ public String getId() {
+ return id;
}
- private synchronized void mergeLocalToBuilder() {
- if (this.applicationId != null
- && !((ApplicationIdPBImpl) applicationId).getProto().equals(
- builder.getApplicationId())) {
- builder.setApplicationId(convertToProtoFormat(this.applicationId));
- }
+ public int getSeq() {
+ return seq;
}
- private synchronized void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
+ @Override
+ public String toString() {
+ return QUERY_ID_PREFIX + SEPARATOR + toStringNoPrefix();
}
- private synchronized void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = ApplicationAttemptIdProto.newBuilder(proto);
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
}
- viaProto = false;
- }
-
- public synchronized int getAttemptId() {
- ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
- return (p.getAttemptId());
- }
-
- public synchronized void setAttemptId(int attemptId) {
- maybeInitBuilder();
- builder.setAttemptId((attemptId));
- }
-
- public synchronized ApplicationId getApplicationId() {
- ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.applicationId != null) {
- return this.applicationId;
+ if (this == obj) {
+ return true;
}
- if (!p.hasApplicationId()) {
- return null;
+ if(!(obj instanceof QueryId)) {
+ return false;
}
- this.applicationId = convertFromProtoFormat(p.getApplicationId());
- return this.applicationId;
- }
-
- public synchronized void setApplicationId(ApplicationId appId) {
- maybeInitBuilder();
- if (appId == null)
- builder.clearApplicationId();
- this.applicationId = appId;
- }
-
- private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
- return new ApplicationIdPBImpl(p);
+ return compareTo((QueryId)obj) == 0;
}
- public static ApplicationIdProto convertToProtoFormat(ApplicationId t) {
- return ((ApplicationIdPBImpl)t).getProto();
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
}
@Override
public int compareTo(QueryId queryId) {
- int compVal = getApplicationId().compareTo(queryId.getApplicationId());
- if (compVal != 0) {
- return compVal;
+ int result = id.compareTo(queryId.id);
+ if (result == 0) {
+ return seq - queryId.seq;
} else {
- return getAttemptId() - queryId.getAttemptId();
+ return result;
}
}
- static final ThreadLocal<NumberFormat> appIdFormat =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(4);
- return fmt;
- }
- };
-
- static final ThreadLocal<NumberFormat> attemptIdFormat =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(6);
- return fmt;
- }
- };
-
- @Override
- public int hashCode() {
- // Generated by eclipse.
- final int prime = 31;
- int result = 1;
- ApplicationId appId = getApplicationId();
- result = prime * result + appId.hashCode();
- result = prime * result + getAttemptId();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- QueryId other = (QueryId) obj;
- if (!this.getApplicationId().equals(other.getApplicationId()))
- return false;
- if (this.getAttemptId() != other.getAttemptId())
- return false;
- return true;
+ public TajoIdProtos.QueryIdProto getProto() {
+ return TajoIdProtos.QueryIdProto.newBuilder()
+ .setId(id)
+ .setSeq(seq)
+ .build();
}
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(PREFIX).append(SEPARATOR)
- .append(this.getApplicationId().getClusterTimestamp()).append(SEPARATOR)
- .append(appIdFormat.get().format(this.getApplicationId().getId()))
- .append(SEPARATOR)
- .append(attemptIdFormat.get().format(getAttemptId()));
- return sb.toString();
+ public String toStringNoPrefix() {
+ return id + SEPARATOR + QueryIdFactory.ID_FORMAT.format(seq);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
index b1a6ab4..90533e3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
@@ -18,39 +18,97 @@
package org.apache.tajo;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.tajo.util.TajoIdUtils;
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class QueryIdFactory {
- private static AtomicInteger nextId =
- new AtomicInteger(-1);
-
- public static void reset() {
- nextId.set(-1);
+ public static final QueryId NULL_QUERY_ID = newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0), 0);
+
+ public static DecimalFormat ID_FORMAT = new DecimalFormat("0000");
+
+ public static DecimalFormat EB_ID_FORMAT = new DecimalFormat("000000");
+
+ public static DecimalFormat QU_ID_FORMAT = new DecimalFormat("000000");
+
+ public static DecimalFormat ATTEMPT_ID_FORMAT = new DecimalFormat("00");
+
+ private static Map<String, AtomicInteger> queryNexIdMap = new HashMap<String, AtomicInteger>();
+
+ private static AtomicInteger nextId = new AtomicInteger(0);
+
+ /*
+ * <ul>
+ * <li>QueryId == q_{masterId}_{seq}</li>
+ * <li>masterId == tajoMasterId or YarnResourceManagerId</li>
+ * <li>seq = TajoSeq or YarnSeq</li>
+ * </ul>
+ */
+ public synchronized static QueryId newQueryId(String seedQueryId) {
+ AtomicInteger queryNextId = queryNexIdMap.get(seedQueryId);
+ if(queryNextId == null) {
+ queryNextId = new AtomicInteger(0);
+ queryNexIdMap.put(seedQueryId, queryNextId);
+ }
+ if(isYarnId(seedQueryId)) {
+ String[] tokens = seedQueryId.split("_");
+ return new QueryId(tokens[1], Integer.parseInt(tokens[2]));
+ } else {
+ int seq = queryNextId.incrementAndGet();
+ if(seq >= 10000) {
+ queryNextId.set(0);
+ seq = queryNextId.incrementAndGet();
+ }
+
+ return new QueryId(seedQueryId, seq);
+ }
+ }
+
+ public synchronized static QueryId newQueryId(long timestamp, int seq) {
+ return new QueryId(String.valueOf(timestamp), seq);
}
+ /**
+ * for test
+ * @return
+ */
public synchronized static QueryId newQueryId() {
- int idInt = nextId.incrementAndGet();
- return TajoIdUtils.createQueryId(BuilderUtils.newApplicationId(
- System.currentTimeMillis(), idInt), idInt);
+ return newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0));
}
-
- public synchronized static SubQueryId newSubQueryId(QueryId queryId) {
- return TajoIdUtils.createSubQueryId(queryId, nextId.incrementAndGet());
+
+ public synchronized static QueryId newQueryId(String seedQueryId, int seq) {
+ if(isYarnId(seedQueryId)) {
+ String[] tokens = seedQueryId.split("_");
+ return new QueryId(tokens[1], Integer.parseInt(tokens[2]));
+ } else {
+ return new QueryId(seedQueryId, seq);
+ }
}
-
- public synchronized static QueryUnitId newQueryUnitId(SubQueryId subQueryId) {
- return new QueryUnitId(subQueryId, nextId.incrementAndGet());
+
+ private static boolean isYarnId(String id) {
+ return id.startsWith("application");
+ }
+
+ public synchronized static ExecutionBlockId newExecutionBlockId(QueryId queryId) {
+ return new ExecutionBlockId(queryId, nextId.incrementAndGet());
+ }
+
+ public synchronized static ExecutionBlockId newExecutionBlockId(QueryId queryId, int id) {
+ return new ExecutionBlockId(queryId, id);
+ }
+
+ public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId) {
+ return new QueryUnitId(executionBlockId, nextId.incrementAndGet());
}
- public synchronized static QueryUnitId newQueryUnitId(SubQueryId subQueryId, int taskId) {
- return new QueryUnitId(subQueryId, taskId);
+ public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId, int id) {
+ return new QueryUnitId(executionBlockId, id);
}
- public synchronized static QueryUnitAttemptId newQueryUnitAttemptId(
- final QueryUnitId queryUnitId, final int attemptId) {
+ public synchronized static QueryUnitAttemptId newQueryUnitAttemptId(QueryUnitId queryUnitId, final int attemptId) {
return new QueryUnitAttemptId(queryUnitId, attemptId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
index 3ef436e..98ba5d1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
@@ -18,139 +18,75 @@
package org.apache.tajo;
-import com.google.common.base.Objects;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProtoOrBuilder;
-import org.apache.tajo.common.ProtoObject;
+public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
+ public static final String QUA_ID_PREFIX = "ta";
-import java.text.NumberFormat;
+ private QueryUnitId queryUnitId;
+ private int id;
-public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId>, ProtoObject<QueryUnitAttemptIdProto> {
- private static final String PREFIX="ta";
-
- private static final NumberFormat format = NumberFormat.getInstance();
- static {
- format.setGroupingUsed(false);
- format.setMinimumIntegerDigits(2);
+ public QueryUnitId getQueryUnitId() {
+ return queryUnitId;
}
- private QueryUnitId queryUnitId = null;
- private int id = -1;
- private String finalId = null;
-
- private QueryUnitAttemptIdProto proto =
- QueryUnitAttemptIdProto.getDefaultInstance();
- private QueryUnitAttemptIdProto.Builder builder = null;
- private boolean viaProto = false;
-
- public QueryUnitAttemptId() {
- builder = QueryUnitAttemptIdProto.newBuilder();
+ public int getId() {
+ return id;
}
- public QueryUnitAttemptId(final QueryUnitId queryUnitId, final int id) {
- this.queryUnitId = queryUnitId;
+ public void setId(int id) {
this.id = id;
}
- public QueryUnitAttemptId(QueryUnitAttemptIdProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public QueryUnitAttemptId(final String finalId) {
- this.finalId = finalId;
- int i = finalId.lastIndexOf(QueryId.SEPARATOR);
- this.queryUnitId = new QueryUnitId(finalId.substring(0, i));
- this.id = Integer.valueOf(finalId.substring(i+1));
- }
-
- public int getId() {
- QueryUnitAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.id != -1) {
- return this.id;
- }
- if (!p.hasId()) {
- return -1;
- }
- this.id = p.getId();
- return id;
- }
-
- public QueryUnitId getQueryUnitId() {
- QueryUnitAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.queryUnitId != null) {
- return this.queryUnitId;
- }
- if (!p.hasId()) {
- return null;
- }
- this.queryUnitId = new QueryUnitId(p.getQueryUnitId());
- return queryUnitId;
+ public QueryUnitAttemptId(QueryUnitId queryUnitId, int id) {
+ this.queryUnitId = queryUnitId;
+ this.id = id;
}
- public QueryId getQueryId() {
- return this.getQueryUnitId().getQueryId();
+ public QueryUnitAttemptId(TajoIdProtos.QueryUnitAttemptIdProto proto) {
+ this(new QueryUnitId(proto.getQueryUnitId()), proto.getId());
}
- public SubQueryId getSubQueryId() {
- return this.getQueryUnitId().getSubQueryId();
+ public TajoIdProtos.QueryUnitAttemptIdProto getProto() {
+ return TajoIdProtos.QueryUnitAttemptIdProto.newBuilder()
+ .setQueryUnitId(queryUnitId.getProto())
+ .setId(id)
+ .build();
}
@Override
- public final String toString() {
- if (finalId == null) {
- StringBuilder sb = new StringBuilder(PREFIX);
- SubQueryId subQueryId = getQueryUnitId().getSubQueryId();
- QueryId appId = subQueryId.getQueryId();
- sb.append(QueryId.SEPARATOR).append(appId.getApplicationId().getClusterTimestamp())
- .append(QueryId.SEPARATOR).append(QueryId.appIdFormat.get().format(appId.getApplicationId().getId()))
- .append(QueryId.SEPARATOR).append(QueryId.attemptIdFormat.get().format(appId.getAttemptId()))
- .append(QueryId.SEPARATOR).append(SubQueryId.subQueryIdFormat.get().format(subQueryId.getId()))
- .append(QueryId.SEPARATOR).append(QueryUnitId.queryUnitIdFormat.get().format(getQueryUnitId().getId()))
- .append(QueryId.SEPARATOR).append(format.format(getId()));
- finalId = sb.toString();
+ public int compareTo(QueryUnitAttemptId queryUnitAttemptId) {
+ int result = queryUnitId.compareTo(queryUnitAttemptId.queryUnitId);
+ if (result == 0) {
+ return id - queryUnitAttemptId.id;
+ } else {
+ return result;
}
- return this.finalId;
}
@Override
- public final boolean equals(final Object o) {
- if (o instanceof QueryUnitAttemptId) {
- QueryUnitAttemptId other = (QueryUnitAttemptId) o;
- return this.toString().equals(other.toString());
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
}
- return false;
+ if(!(obj instanceof QueryUnitAttemptId)) {
+ return false;
+ }
+ return compareTo((QueryUnitAttemptId)obj) == 0;
}
@Override
public int hashCode() {
- return Objects.hashCode(getQueryUnitId(), getId());
+ return toString().hashCode();
}
@Override
- public int compareTo(QueryUnitAttemptId o) {
- return this.getId() - o.getId();
- }
-
- private void mergeLocalToBuilder() {
- if (builder == null) {
- builder = QueryUnitAttemptIdProto.newBuilder(proto);
- }
- if (this.queryUnitId != null) {
- builder.setQueryUnitId(queryUnitId.getProto());
- }
- if (this.id != -1) {
- builder.setId(id);
- }
+ public String toString() {
+ return QUA_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
}
- @Override
- public QueryUnitAttemptIdProto getProto() {
- if (!viaProto) {
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
- return proto;
+ public String toStringNoPrefix() {
+ return queryUnitId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.ATTEMPT_ID_FORMAT.format(id);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
index 4826691..21addf9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
@@ -18,145 +18,71 @@
package org.apache.tajo;
-import com.google.common.base.Objects;
-import org.apache.tajo.TajoIdProtos.QueryUnitIdProto;
-import org.apache.tajo.TajoIdProtos.QueryUnitIdProtoOrBuilder;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.util.TajoIdUtils;
+public class QueryUnitId implements Comparable<QueryUnitId> {
+ public static final String QU_ID_PREFIX = "t";
-import java.text.NumberFormat;
+ private ExecutionBlockId executionBlockId;
+ private int id;
-public class QueryUnitId implements Comparable<QueryUnitId>,
- ProtoObject<QueryUnitIdProto> {
- private static final String PREFIX = "t";
-
- static final ThreadLocal<NumberFormat> queryUnitIdFormat =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(6);
- return fmt;
- }
- };
-
- private SubQueryId subQueryId = null;
- private int id = -1;
- private String finalId = null;
-
- private QueryUnitIdProto proto = QueryUnitIdProto.getDefaultInstance();
- private QueryUnitIdProto.Builder builder = null;
- private boolean viaProto = false;
-
- public QueryUnitId() {
- builder = QueryUnitIdProto.newBuilder();
- }
-
- public QueryUnitId(final SubQueryId subQueryId,
- final int id) {
- this.subQueryId = subQueryId;
+ public QueryUnitId(ExecutionBlockId executionBlockId, int id) {
+ this.executionBlockId = executionBlockId;
this.id = id;
}
-
- public QueryUnitId(QueryUnitIdProto proto) {
- this.proto = proto;
- viaProto = true;
+
+ public QueryUnitId(TajoIdProtos.QueryUnitIdProto proto) {
+ this(new ExecutionBlockId(proto.getExecutionBlockId()), proto.getId());
}
-
- public QueryUnitId(final String finalId) {
- this.finalId = finalId;
- int i = finalId.lastIndexOf(QueryId.SEPARATOR);
- this.subQueryId = TajoIdUtils.newSubQueryId(finalId.substring(0, i));
- this.id = Integer.valueOf(finalId.substring(i+1));
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
}
-
+
public int getId() {
- QueryUnitIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.id != -1) {
- return this.id;
- }
- if (!p.hasId()) {
- return -1;
- }
- this.id = p.getId();
return id;
}
-
- public SubQueryId getSubQueryId() {
- QueryUnitIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.subQueryId != null) {
- return this.subQueryId;
- }
- if (!p.hasSubQueryId()) {
- return null;
- }
- this.subQueryId = TajoIdUtils.newSubQueryId(p.getSubQueryId());
- return this.subQueryId;
- }
-
- public QueryId getQueryId() {
- return this.getSubQueryId().getQueryId();
+
+ public TajoIdProtos.QueryUnitIdProto getProto() {
+ return TajoIdProtos.QueryUnitIdProto.newBuilder()
+ .setExecutionBlockId(executionBlockId.getProto())
+ .setId(id)
+ .build();
}
-
+
@Override
- public final String toString() {
- if (finalId == null) {
- StringBuilder sb = new StringBuilder(PREFIX);
- QueryId appId = getSubQueryId().getQueryId();
- sb.append(QueryId.SEPARATOR).append(
- appId.getApplicationId().getClusterTimestamp())
- .append(QueryId.SEPARATOR).append(
- QueryId.appIdFormat.get().format(appId.getApplicationId().getId()))
- .append(QueryId.SEPARATOR).append(
- QueryId.attemptIdFormat.get().format(appId.getAttemptId()))
- .append(QueryId.SEPARATOR).append(
- SubQueryId.subQueryIdFormat.get().format(getSubQueryId().getId()))
- .append(QueryId.SEPARATOR).append(queryUnitIdFormat.get().format(getId()));
- finalId = sb.toString();
+ public int compareTo(QueryUnitId queryUnitId) {
+ int result = executionBlockId.compareTo(queryUnitId.executionBlockId);
+ if (result == 0) {
+ return id - queryUnitId.id;
+ } else {
+ return result;
}
- return this.finalId;
}
-
+
@Override
- public final boolean equals(final Object o) {
- if (o instanceof QueryUnitId) {
- QueryUnitId other = (QueryUnitId) o;
- return getSubQueryId().equals(other.getSubQueryId()) &&
- getId() == other.getId();
- }
- return false;
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if(!(obj instanceof QueryUnitId)) {
+ return false;
+ }
+ return compareTo((QueryUnitId)obj) == 0;
}
-
+
@Override
public int hashCode() {
- return Objects.hashCode(getSubQueryId(), getId());
+ return toString().hashCode();
}
@Override
- public final int compareTo(final QueryUnitId o) {
- return this.toString().compareTo(o.toString());
- }
-
- private void mergeLocalToBuilder() {
- if (builder == null) {
- builder = QueryUnitIdProto.newBuilder(proto);
- }
- if (this.subQueryId != null) {
- builder.setSubQueryId(subQueryId.getProto());
- }
- if (this.id != -1) {
- builder.setId(id);
- }
+ public String toString() {
+ return QU_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
}
- @Override
- public QueryUnitIdProto getProto() {
- if (!viaProto) {
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
- return proto;
+ public String toStringNoPrefix() {
+ return executionBlockId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.QU_ID_FORMAT.format(id);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/SubQueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SubQueryId.java b/tajo-common/src/main/java/org/apache/tajo/SubQueryId.java
deleted file mode 100644
index 2a11f38..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/SubQueryId.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.tajo.TajoIdProtos.SubQueryIdProto;
-import org.apache.tajo.TajoIdProtos.SubQueryIdProtoOrBuilder;
-
-import java.text.NumberFormat;
-
-public class SubQueryId implements Comparable<SubQueryId> {
- public static final String PREFIX = "sq";
-
- static final ThreadLocal<NumberFormat> subQueryIdFormat =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(2);
- return fmt;
- }
- };
-
- private SubQueryIdProto proto = SubQueryIdProto.getDefaultInstance();
- private SubQueryIdProto.Builder builder = null;
- private boolean viaProto = false;
-
- private QueryId queryId = null;
-
- public SubQueryId() {
- builder = SubQueryIdProto.newBuilder(proto);
- }
-
- public SubQueryId(SubQueryIdProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- /**
- * @return the subquery number.
- */
- public synchronized int getId() {
- SubQueryIdProtoOrBuilder p = viaProto ? proto : builder;
- return (p.getId());
- }
-
- public synchronized void setId(int id) {
- maybeInitBuilder();
- builder.setId((id));
- }
-
- /**
- * @return the associated <code>QueryId</code>
- */
- public synchronized QueryId getQueryId() {
- SubQueryIdProtoOrBuilder p = viaProto ? proto : builder;
- if (this.queryId != null) {
- return this.queryId;
- }
- if (!p.hasQueryId()) {
- return null;
- }
- queryId = new QueryId(p.getQueryId());
- return queryId;
- }
-
- public synchronized void setQueryId(QueryId queryId) {
- maybeInitBuilder();
- if (queryId == null)
- builder.clearQueryId();
- this.queryId = queryId;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + getId();
- result = prime * result + getQueryId().hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- SubQueryId other = (SubQueryId) obj;
- if (getId() != other.getId())
- return false;
- if (!getQueryId().equals(other.getQueryId()))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder(PREFIX);
- QueryId queryId = getQueryId();
- builder.append(QueryId.SEPARATOR).append(queryId.getApplicationId().getClusterTimestamp());
- builder.append(QueryId.SEPARATOR).append(
- QueryId.appIdFormat.get().format(queryId.getApplicationId().getId()));
- builder.append(QueryId.SEPARATOR).append(QueryId.attemptIdFormat.get().format(queryId.getAttemptId()))
- .append(QueryId.SEPARATOR)
- .append(subQueryIdFormat.get().format(getId()));
- return builder.toString();
- }
-
- @Override
- public int compareTo(SubQueryId other) {
- int queryIdComp = this.getQueryId().compareTo(other.getQueryId());
- if (queryIdComp == 0) {
- return this.getId() - other.getId();
- } else {
- return queryIdComp;
- }
- }
-
- public synchronized SubQueryIdProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- private synchronized void mergeLocalToBuilder() {
- if (this.queryId != null
- && !this.queryId.getProto().equals(builder.getQueryId())) {
- builder.setQueryId(queryId.getProto());
- }
- }
-
- private synchronized void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private synchronized void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = SubQueryIdProto.newBuilder(proto);
- }
- viaProto = false;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 21d0c63..2fe64e6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -66,8 +66,7 @@ public class TajoConf extends YarnConfiguration {
// Service Addresses
TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"), // used internally
CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
- QUERY_MASTER_MANAGER_SERVICE_ADDRESS("tajo.master.querymastermanager.addr", "127.0.0.1:9005"),
- QUERY_MASTER_CLIENT_SERVICE_ADDRESS("tajo.qmm.client.addr", "0.0.0.0:0"),
+ TAJO_MASTER_SERVICE_ADDRESS("tajo.master.manager.addr", "127.0.0.1:9005"),
//////////////////////////////////
// Catalog Configuration
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 7eaade8..05c0972 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -108,11 +108,11 @@ public class TUtil {
return list;
}
- public static QueryUnitAttemptId newQueryUnitAttemptId() {
+ public static QueryUnitAttemptId newQueryUnitAttemptId() {
return QueryIdFactory.newQueryUnitAttemptId(
QueryIdFactory.newQueryUnitId(
- QueryIdFactory.newSubQueryId(
- QueryIdFactory.newQueryId())), 0);
+ QueryIdFactory.newExecutionBlockId(
+ QueryIdFactory.newQueryId())), 0);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
index 9dfbfbc..3b4bd51 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
@@ -18,104 +18,22 @@
package org.apache.tajo.util;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TajoIdProtos.SubQueryIdProto;
-import java.util.Iterator;
-
-import static org.apache.hadoop.yarn.util.StringHelper._split;
+import java.text.DecimalFormat;
public class TajoIdUtils {
- public static final String YARN_APPLICATION_PREFIX = "application";
- public static final String YARN_CONTAINER_PREFIX = "container";
- public static final String YARN_APPLICATION_ATTEMPT_PREFIX = "appattempt";
-
- /** It is mainly for DDL statements which don's have any query id. */
- public static final QueryId NullQueryId =
- TajoIdUtils.createQueryId(BuilderUtils.newApplicationId(0, 0), 0);
+ public static DecimalFormat MASTER_ID_FORMAT = new DecimalFormat("0000000000000");
- public static QueryId createQueryId(ApplicationId appId, int attemptId) {
- return newQueryId(appId, attemptId);
- }
-
- public static QueryId createQueryId(ApplicationAttemptId appAttemptId) {
- QueryId queryId = new QueryId();
- queryId.setApplicationId(appAttemptId.getApplicationId());
- queryId.setAttemptId(appAttemptId.getAttemptId());
- return queryId;
- }
-
- public static QueryId createQueryId(String queryId) {
- String[] split = queryId.split(QueryId.SEPARATOR);
- ApplicationId appId = BuilderUtils.newApplicationId(Long.valueOf(split[1]),
- Integer.parseInt(split[2]));
- int idInt = Integer.parseInt(split[3]);
- return newQueryId(appId, idInt);
- }
-
- public static SubQueryId createSubQueryId(QueryId queryId,
- int subQueryIdInt) {
- return newSubQueryId(queryId, subQueryIdInt);
- }
-
- public static QueryId newQueryId(ApplicationId appId, int id) {
- QueryId queryId = new QueryId();
- queryId.setApplicationId(appId);
- queryId.setAttemptId(id);
- return queryId;
- }
-
- public static SubQueryId newSubQueryId(QueryId jobId, int id) {
- SubQueryId taskId = new SubQueryId();
- taskId.setQueryId(jobId);
- taskId.setId(id);
- return taskId;
- }
-
- public static SubQueryId newSubQueryId(String subQueryId) {
- String [] split = subQueryId.split(QueryId.SEPARATOR);
- ApplicationId appId = BuilderUtils.newApplicationId(Long.valueOf(split[1]),
- Integer.valueOf(split[2]));
- QueryId queryId = TajoIdUtils.createQueryId(appId, Integer.valueOf(split[3]));
- return createSubQueryId(queryId, Integer.parseInt(split[4]));
- }
-
- public static SubQueryId newSubQueryId(SubQueryIdProto proto) {
- SubQueryId subId = new SubQueryId(proto);
- return subId;
- }
+ public static ExecutionBlockId createExecutionBlockId(String idStr) {
+ String[] tokens = idStr.split("_");
- public static ApplicationAttemptId toApplicationAttemptId(
- String applicationAttmeptIdStr) {
- //This methood from YARN.ConvertUtils
- Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
- if (!it.next().equals(YARN_APPLICATION_ATTEMPT_PREFIX)) {
- throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
- + applicationAttmeptIdStr);
- }
- try {
- return toApplicationAttemptId(it);
- } catch (NumberFormatException n) {
- throw new IllegalArgumentException("Invalid AppAttemptId: "
- + applicationAttmeptIdStr, n);
- }
+ return new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3]));
}
- private static ApplicationAttemptId toApplicationAttemptId(
- Iterator<String> it) throws NumberFormatException {
- //This methood from YARN.ConvertUtils
- ApplicationId appId = Records.newRecord(ApplicationId.class);
- appId.setClusterTimestamp(Long.parseLong(it.next()));
- appId.setId(Integer.parseInt(it.next()));
- ApplicationAttemptId appAttemptId = Records
- .newRecord(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(appId);
- appAttemptId.setAttemptId(Integer.parseInt(it.next()));
- return appAttemptId;
+ public static QueryId parseQueryId(String idStr) {
+ String[] tokens = idStr.split("_");
+ return new QueryId(tokens[1], Integer.parseInt(tokens[2]));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/TajoIdProtos.proto b/tajo-common/src/main/proto/TajoIdProtos.proto
index 04c67f2..a87c825 100644
--- a/tajo-common/src/main/proto/TajoIdProtos.proto
+++ b/tajo-common/src/main/proto/TajoIdProtos.proto
@@ -21,15 +21,18 @@ option java_outer_classname = "TajoIdProtos";
option java_generic_services = false;
option java_generate_equals_and_hash = true;
-import "yarn_protos.proto";
+message QueryIdProto {
+ required string id = 1;
+ required int32 seq = 2;
+}
-message SubQueryIdProto {
- required ApplicationAttemptIdProto queryId = 1;
+message ExecutionBlockIdProto {
+ required QueryIdProto queryId = 1;
required int32 id = 2;
}
message QueryUnitIdProto {
- required SubQueryIdProto subQueryId = 1;
+ required ExecutionBlockIdProto executionBlockId = 1;
required int32 id = 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-common/src/test/java/org/apache/tajo/datum/TestBitDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestBitDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestBitDatum.java
index cbcaaf9..ba938b2 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestBitDatum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestBitDatum.java
@@ -18,8 +18,8 @@
package org.apache.tajo.datum;
-import org.junit.Test;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index ba5ad9f..6c3351f 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -127,8 +127,8 @@
<argument>src/main/proto/tajo_protos.proto</argument>
<argument>src/main/proto/ClientProtos.proto</argument>
<argument>src/main/proto/QueryMasterClientProtocol.proto</argument>
- <argument>src/main/proto/QueryMasterManagerProtocol.proto</argument>
- <argument>src/main/proto/QueryMasterProtocol.proto</argument>
+ <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+ <argument>src/main/proto/TajoWorkerProtocol.proto</argument>
<argument>src/main/proto/TajoMasterClientProtocol.proto</argument>
</arguments>
</configuration>
@@ -329,7 +329,6 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
-
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/log4j.properties b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
index c1ac487..749124c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
@@ -23,3 +23,6 @@ log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index 2dd25de..55a97ca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -25,6 +25,7 @@ import jline.console.history.PersistentHistory;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
@@ -34,7 +35,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.TajoIdUtils;
import java.io.File;
import java.io.InputStream;
@@ -264,6 +264,7 @@ public class TajoCli {
}
public int executeStatements(String line) throws Exception {
+
String stripped;
for (String statement : line.split(";")) {
stripped = StringUtils.chomp(statement);
@@ -282,12 +283,12 @@ public class TajoCli {
invokeCommand(cmds);
} else { // submit a query to TajoMaster
- ClientProtos.SubmitQueryResponse response = client.executeQuery(stripped);
+ ClientProtos.GetQueryStatusResponse response = client.executeQuery(stripped);
if (response.getResultCode() == ClientProtos.ResultCode.OK) {
QueryId queryId = null;
try {
queryId = new QueryId(response.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
sout.println("OK");
} else {
getQueryResult(queryId);
@@ -298,9 +299,9 @@ public class TajoCli {
}
}
} else {
- if (response.hasErrorMessage()) {
- sout.println(response.getErrorMessage());
- }
+ if (response.hasErrorMessage()) {
+ sout.println(response.getErrorMessage());
+ }
}
}
}
@@ -313,7 +314,7 @@ public class TajoCli {
private void getQueryResult(QueryId queryId) {
// if query is empty string
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index cd8706e..7e7b787 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -22,7 +22,9 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableDesc;
@@ -30,7 +32,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.ResultSetImpl;
-import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
@@ -38,8 +39,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.rpc.ProtoBlockingRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -128,7 +127,7 @@ public class TajoClient {
* In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
* or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
*/
- public ClientProtos.SubmitQueryResponse executeQuery(String tql) throws ServiceException {
+ public GetQueryStatusResponse executeQuery(String tql) throws ServiceException {
QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(tql);
@@ -147,9 +146,9 @@ public class TajoClient {
throws ServiceException, IOException {
QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(sql);
- SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+ GetQueryStatusResponse response = tajoMasterService.submitQuery(null, builder.build());
QueryId queryId = new QueryId(response.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return this.createNullResultSet(queryId);
} else {
return this.getQueryResultAndWait(queryId);
@@ -171,6 +170,9 @@ public class TajoClient {
String queryMasterHost = res.getQueryMasterHost();
if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
+
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+ res = queryMasterService.getQueryStatus(null, builder.build());
}
}
return new QueryStatus(res);
@@ -204,7 +206,7 @@ public class TajoClient {
public ResultSet getQueryResult(QueryId queryId)
throws ServiceException, IOException {
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return createNullResultSet(queryId);
}
@@ -214,14 +216,14 @@ public class TajoClient {
public ResultSet getQueryResultAndWait(QueryId queryId)
throws ServiceException, IOException {
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return createNullResultSet(queryId);
}
QueryStatus status = getQueryStatus(queryId);
while(status != null && isQueryRunnning(status.getState())) {
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -237,7 +239,7 @@ public class TajoClient {
}
} else {
- LOG.warn("Query " + status.getQueryId() + ") failed: " + status.getState());
+ LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState());
//TODO throw SQLException(?)
return createNullResultSet(queryId);
@@ -249,7 +251,7 @@ public class TajoClient {
}
public TableDesc getResultDesc(QueryId queryId) throws ServiceException {
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 7d430c5..cf10a4f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -19,7 +19,9 @@
package org.apache.tajo.engine.query;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.storage.Fragment;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index 808b910..8cbe956 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -16,9 +16,6 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.query;
import com.google.common.collect.Lists;
@@ -297,7 +294,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Array getArray(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -308,7 +304,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Array getArray(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -319,7 +314,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public InputStream getAsciiStream(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -330,7 +324,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public InputStream getAsciiStream(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -363,7 +356,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public BigDecimal getBigDecimal(int arg0, int arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -374,7 +366,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public BigDecimal getBigDecimal(String arg0, int arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -385,7 +376,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public InputStream getBinaryStream(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -396,7 +386,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public InputStream getBinaryStream(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -499,7 +488,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Reader getCharacterStream(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -510,7 +498,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Reader getCharacterStream(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -561,7 +548,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Date getDate(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -572,7 +558,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Date getDate(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -583,7 +568,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Date getDate(int arg0, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -594,7 +578,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Date getDate(String arg0, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -741,7 +724,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Reader getNCharacterStream(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -752,7 +734,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Reader getNCharacterStream(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -783,7 +764,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public String getNString(int fieldId) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -794,7 +774,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public String getNString(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -832,7 +811,6 @@ public class ResultSetImpl implements ResultSet {
@Override
public Object getObject(int arg0, Map<String, Class<?>> arg1)
throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -844,7 +822,6 @@ public class ResultSetImpl implements ResultSet {
@Override
public Object getObject(String arg0, Map<String, Class<?>> arg1)
throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -855,7 +832,6 @@ public class ResultSetImpl implements ResultSet {
*/
public <T> T getObject(String arg0, Class<T> arg1)
throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -866,10 +842,9 @@ public class ResultSetImpl implements ResultSet {
*/
public <T> T getObject(int arg0, Class<T> arg1)
throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
-
+
/*
* (non-Javadoc)
*
@@ -907,7 +882,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public RowId getRowId(int fieldId) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -918,7 +892,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public RowId getRowId(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1007,7 +980,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Time getTime(int fieldId) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1018,7 +990,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Time getTime(String name) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1029,7 +1000,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Time getTime(int fieldId, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1040,7 +1010,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Time getTime(String name, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1051,7 +1020,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Timestamp getTimestamp(int fieldId) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1062,7 +1030,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Timestamp getTimestamp(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1073,7 +1040,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Timestamp getTimestamp(int fieldId, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1084,7 +1050,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1105,7 +1070,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public URL getURL(int fieldId) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1116,7 +1080,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public URL getURL(String arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
@@ -1147,7 +1110,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public SQLWarning getWarnings() throws SQLException {
- // TODO
throw new UnsupportedException();
}
@@ -1298,7 +1260,6 @@ public class ResultSetImpl implements ResultSet {
*/
@Override
public boolean relative(int arg0) throws SQLException {
- // TODO Auto-generated method stub
throw new UnsupportedException();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index bb4008f..c6fc632 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -23,13 +23,13 @@ package org.apache.tajo.ipc.protocolrecords;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.Fragment;
import java.net.URI;
import java.util.List;
-public interface QueryUnitRequest extends ProtoObject<QueryMasterProtocol.QueryUnitRequestProto> {
+public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
public QueryUnitAttemptId getId();
public List<Fragment> getFragments();
@@ -39,7 +39,7 @@ public interface QueryUnitRequest extends ProtoObject<QueryMasterProtocol.QueryU
public boolean isInterQuery();
public void setInterQuery();
public void addFetch(String name, URI uri);
- public List<QueryMasterProtocol.Fetch> getFetches();
+ public List<TajoWorkerProtocol.Fetch> getFetches();
public boolean shouldDie();
public void setShouldDie();
}