You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:17 UTC
[1/8] tajo git commit: TAJO-1258: Close() for classes derived from
FileAppender should be robust. (Jongyoung Park via jinho)
Repository: tajo
Updated Branches:
refs/heads/index_support e04c65fdd -> 767b9a4b7
TAJO-1258: Close() for classes derived from FileAppender should be robust. (Jongyoung Park via jinho)
Closes #340
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/809cba37
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/809cba37
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/809cba37
Branch: refs/heads/index_support
Commit: 809cba3758564acd4def17928f95de7b4c913c45
Parents: 1c29c1c
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 11:00:27 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 11:00:27 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../src/main/java/org/apache/tajo/storage/CSVFile.java | 4 +---
.../src/main/java/org/apache/tajo/storage/RowFile.java | 3 ++-
.../src/main/java/org/apache/tajo/storage/avro/AvroAppender.java | 3 ++-
.../java/org/apache/tajo/storage/parquet/ParquetAppender.java | 3 ++-
.../apache/tajo/storage/sequencefile/SequenceFileAppender.java | 4 +---
.../java/org/apache/tajo/storage/text/DelimitedTextFile.java | 4 +---
7 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 96b63ea..369dbda 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1258: Close() for classes derived from FileAppender should be robust.
+ (Jongyoung Park via jinho)
+
TAJO-1288: Refactoring org.apache.tajo.master package. (hyunsik)
TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events.
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 28c263c..d4dde28 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -225,10 +225,8 @@ public class CSVFile {
deflateFilter.resetState();
deflateFilter = null;
}
-
- os.close();
} finally {
- IOUtils.cleanup(LOG, fos);
+ IOUtils.cleanup(LOG, os, fos, outputStream);
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 5510cbf..1ff6c4f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -468,7 +469,7 @@ public class RowFile {
}
sync();
out.flush();
- out.close();
+ IOUtils.cleanup(LOG, out);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index f617099..da426ea 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableMeta;
@@ -201,7 +202,7 @@ public class AvroAppender extends FileAppender {
*/
@Override
public void close() throws IOException {
- dataFileWriter.close();
+ IOUtils.cleanup(null, dataFileWriter);
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 4557287..415c338 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage.parquet;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.storage.StorageConstants;
import parquet.hadoop.ParquetOutputFormat;
@@ -128,7 +129,7 @@ public class ParquetAppender extends FileAppender {
*/
@Override
public void close() throws IOException {
- writer.close();
+ IOUtils.cleanup(null, writer);
}
public long getEstimatedOutputSize() throws IOException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 8b5d677..404352c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -248,7 +248,6 @@ public class SequenceFileAppender extends FileAppender {
@Override
public void flush() throws IOException {
os.flush();
- writer.close();
}
@Override
@@ -258,8 +257,7 @@ public class SequenceFileAppender extends FileAppender {
stats.setNumBytes(getOffset());
}
- os.close();
- writer.close();
+ IOUtils.cleanup(LOG, os, writer);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 15db4c3..ac7c549 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -241,10 +241,8 @@ public class DelimitedTextFile {
deflateFilter.resetState();
deflateFilter = null;
}
-
- os.close();
} finally {
- IOUtils.cleanup(LOG, fos);
+ IOUtils.cleanup(LOG, fos, os, outputStream);
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
[2/8] tajo git commit: TAJO-1282: Cleanup the relationship of
QueryInProgress and QueryJobManager.
Posted by ji...@apache.org.
TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.
Closes #334
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/307c6c9f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/307c6c9f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/307c6c9f
Branch: refs/heads/index_support
Commit: 307c6c9f558ae96ffa2f7351ffa3dc5788ba4dbb
Parents: 809cba3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 14:47:47 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 14:47:47 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/master/QueryJobManager.java | 59 +++++++------
.../apache/tajo/master/TajoMasterService.java | 7 --
.../master/rm/TajoWorkerResourceManager.java | 2 +-
.../tajo/master/rm/WorkerResourceManager.java | 2 +-
.../master/scheduler/SimpleFifoScheduler.java | 3 +-
.../tajo/querymaster/QueryInProgress.java | 87 ++------------------
.../apache/tajo/querymaster/QueryJobEvent.java | 5 +-
.../apache/tajo/querymaster/QueryMaster.java | 2 +-
.../tajo/querymaster/QueryMasterTask.java | 47 +----------
.../src/main/proto/TajoMasterProtocol.proto | 1 -
.../apache/tajo/querymaster/TestKillQuery.java | 2 +-
12 files changed, 53 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 369dbda..35670d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1282: Cleanup the relationship of QueryInProgress and
+ QueryJobManager. (hyunsik)
+
TAJO-1258: Close() for classes derived from FileAppender should be robust.
(Jongyoung Park via jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
index c9b8711..6a8da27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
@@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * QueryJobManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
public class QueryJobManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
@@ -69,7 +72,7 @@ public class QueryJobManager extends CompositeService {
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
try {
this.dispatcher = new AsyncDispatcher();
addService(this.dispatcher);
@@ -81,24 +84,24 @@ public class QueryJobManager extends CompositeService {
catchException(null, e);
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
synchronized(runningQueries) {
for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
- eachQueryInProgress.stop();
+ eachQueryInProgress.stopProgress();
}
}
this.scheduler.stop();
- super.stop();
+ super.serviceStop();
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
this.scheduler.start();
- super.start();
+ super.serviceStart();
}
public EventHandler getEventHandler() {
@@ -164,39 +167,42 @@ public class QueryJobManager extends CompositeService {
runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
}
- addService(queryInProgress);
- queryInProgress.init(getConfig());
- queryInProgress.start();
-
- if (!queryInProgress.startQueryMaster()) {
+ if (queryInProgress.startQueryMaster()) {
+ dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+ queryInProgress.getQueryInfo()));
+ } else {
stopQuery(queryId);
}
return queryInProgress.getQueryInfo();
}
- public TajoMaster.MasterContext getMasterContext() {
- return masterContext;
- }
-
class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
@Override
public void handle(QueryJobEvent event) {
QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
- if(queryInProgress == null) {
+
+
+ if (queryInProgress == null) {
LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
return;
}
- if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+
+ if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ queryInProgress.submmitQueryToMaster();
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
stopQuery(event.getQueryInfo().getQueryId());
- } else if (queryInProgress.isStarted()) {
- queryInProgress.getEventHandler().handle(event);
+
} else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
+ queryInProgress.kill();
stopQuery(queryInProgress.getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ queryInProgress.heartbeat(event.getQueryInfo());
}
}
}
@@ -219,7 +225,7 @@ public class QueryJobManager extends CompositeService {
LOG.info("Stop QueryInProgress:" + queryId);
QueryInProgress queryInProgress = getQueryInProgress(queryId);
if(queryInProgress != null) {
- queryInProgress.stop();
+ queryInProgress.stopProgress();
synchronized(submittedQueries) {
submittedQueries.remove(queryId);
}
@@ -245,7 +251,6 @@ public class QueryJobManager extends CompositeService {
avgExecutionTime.set(executionTime);
}
executedQuerySize.incrementAndGet();
- removeService(queryInProgress);
} else {
LOG.warn("No QueryInProgress while query stopping: " + queryId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index a7df206..02bdfa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -136,13 +136,6 @@ public class TajoMasterService extends AbstractService {
}
@Override
- public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
- RpcCallback<BoolProto> done) {
- context.getQueryJobManager().stopQuery(new QueryId(request));
- done.run(BOOL_TRUE);
- }
-
- @Override
public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c4200d5..9f2a3d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -526,7 +526,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public void stopQueryMaster(QueryId queryId) {
+ public void releaseQueryMaster(QueryId queryId) {
if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
LOG.warn("No QueryMaster resource info for " + queryId);
return;
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index b237cc5..79ec0ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -80,7 +80,7 @@ public interface WorkerResourceManager extends Service {
*
* @param queryId QueryId to be stopped
*/
- public void stopQueryMaster(QueryId queryId);
+ public void releaseQueryMaster(QueryId queryId);
/**
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index bd8ca28..a091ed5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -58,7 +58,8 @@ public class SimpleFifoScheduler implements Scheduler {
LOG.info("Size of Fifo queue is " + qSize);
}
- QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+ QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1,
+ queryInProgress.getQueryInfo().getStartTime());
boolean result = pool.add(querySchedulingInfo);
if (getRunningQueries().size() == 0) wakeupProcessor();
return result;
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
index bda2ec1..f83f244 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -20,11 +20,7 @@ package org.apache.tajo.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.query.QueryContext;
@@ -35,12 +31,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
@@ -48,15 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-public class QueryInProgress extends CompositeService {
+public class QueryInProgress {
private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
private QueryId queryId;
private Session session;
- private AsyncDispatcher dispatcher;
-
private LogicalRootNode plan;
private AtomicBoolean querySubmitted = new AtomicBoolean(false);
@@ -76,7 +70,7 @@ public class QueryInProgress extends CompositeService {
Session session,
QueryContext queryContext,
QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
- super(QueryInProgress.class.getName());
+
this.masterContext = masterContext;
this.session = session;
this.queryId = queryId;
@@ -86,23 +80,14 @@ public class QueryInProgress extends CompositeService {
queryInfo.setStartTime(System.currentTimeMillis());
}
- @Override
- public void init(Configuration conf) {
- dispatcher = new AsyncDispatcher();
- this.addService(dispatcher);
-
- dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
- super.init(conf);
- }
-
public synchronized void kill() {
+ getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
if(queryMasterRpcClient != null){
queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
}
}
- @Override
- public void stop() {
+ public void stopProgress() {
if(stopped.getAndSet(true)) {
return;
}
@@ -110,52 +95,15 @@ public class QueryInProgress extends CompositeService {
LOG.info("=========================================================");
LOG.info("Stop query:" + queryId);
- masterContext.getResourceManager().stopQueryMaster(queryId);
-
- long startTime = System.currentTimeMillis();
- while(true) {
- try {
- if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
- LOG.info(queryId + " QueryMaster stopped");
- break;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- break;
- }
-
- try {
- synchronized (this){
- wait(100);
- }
- } catch (InterruptedException e) {
- break;
- }
- if(System.currentTimeMillis() - startTime > 60 * 1000) {
- LOG.warn("Failed to stop QueryMaster:" + queryId);
- break;
- }
- }
+ masterContext.getResourceManager().releaseQueryMaster(queryId);
if(queryMasterRpc != null) {
RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
}
masterContext.getHistoryWriter().appendHistory(queryInfo);
- super.stop();
- }
-
- @Override
- public void start() {
- super.start();
}
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
-
-
public boolean startQueryMaster() {
try {
LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
@@ -173,8 +121,6 @@ public class QueryInProgress extends CompositeService {
queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
return true;
} catch (Exception e) {
catchException(e);
@@ -182,23 +128,6 @@ public class QueryInProgress extends CompositeService {
}
}
- class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
- @Override
- public void handle(QueryJobEvent queryJobEvent) {
- if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
- heartbeat(queryJobEvent.getQueryInfo());
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
- QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
- queryInProgress.getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
- submmitQueryToMaster();
- } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- kill();
- }
- }
- }
-
private void connectQueryMaster() throws Exception {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
@@ -207,7 +136,7 @@ public class QueryInProgress extends CompositeService {
queryMasterRpcClient = queryMasterRpc.getStub();
}
- private synchronized void submmitQueryToMaster() {
+ public synchronized void submmitQueryToMaster() {
if(querySubmitted.get()) {
return;
}
@@ -256,7 +185,7 @@ public class QueryInProgress extends CompositeService {
return !stopped.get() && this.querySubmitted.get();
}
- private void heartbeat(QueryInfo queryInfo) {
+ public void heartbeat(QueryInfo queryInfo) {
LOG.info("Received QueryMaster heartbeat:" + queryInfo);
// to avoid partial update by different heartbeats
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
index 1a1f2ff..27eb2b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -35,12 +35,9 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
}
public enum Type {
- QUERY_JOB_START,
+ QUERY_MASTER_START,
QUERY_JOB_HEARTBEAT,
- QUERY_JOB_FINISH,
QUERY_JOB_STOP,
- QUERY_MASTER_START,
- QUERY_MASTER_STOP,
QUERY_JOB_KILL
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 76df397..02760a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -473,7 +473,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void handle(QueryStartEvent event) {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr());
synchronized(queryMasterTasks) {
queryMasterTasks.put(event.getQueryId(), queryMasterTask);
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index bab5903..fd52488 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -41,13 +41,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -58,8 +55,7 @@ import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.session.Session;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.StorageUtil;
@@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService {
private Query query;
- private MasterPlan masterPlan;
-
private String jsonExpr;
- private String logicalPlanJson;
-
private AsyncDispatcher dispatcher;
private final long querySubmitTime;
@@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService {
new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
- String logicalPlanJson) {
+ QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
@@ -133,7 +124,6 @@ public class QueryMasterTask extends CompositeService {
this.session = session;
this.queryContext = queryContext;
this.jsonExpr = jsonExpr;
- this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@@ -198,42 +188,11 @@ public class QueryMasterTask extends CompositeService {
LOG.fatal(t.getMessage(), t);
}
- RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
-
- super.stop();
-
- //TODO change report to tajo master
if (queryMetrics != null) {
queryMetrics.report(new MetricsConsoleReporter());
}
+ super.stop();
LOG.info("Stopped QueryMasterTask:" + queryId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index cc83e47..bc73596 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -143,6 +143,5 @@ service TajoMasterProtocolService {
rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
- rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index a125196..bd899cd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -90,7 +90,7 @@ public class TestKillQuery {
QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
- queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+ queryId, session, defaultContext, expr.toJson());
queryMasterTask.init(conf);
queryMasterTask.getQueryTaskContext().getDispatcher().start();
[7/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to
QueryCoordinatorProtocol.
Posted by ji...@apache.org.
TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
Closes #342
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/807868bd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/807868bd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/807868bd
Branch: refs/heads/index_support
Commit: 807868bd4d1ca1c8bd3aee33d31cca3d43dd2273
Parents: 50a8a66
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 20:44:21 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 20:44:21 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
tajo-core/pom.xml | 2 +-
.../tajo/master/QueryCoordinatorService.java | 160 ++++++++++
.../org/apache/tajo/master/QueryInProgress.java | 228 +++++++++++++
.../org/apache/tajo/master/QueryJobManager.java | 316 -------------------
.../org/apache/tajo/master/QueryManager.java | 315 ++++++++++++++++++
.../apache/tajo/master/TajoContainerProxy.java | 12 +-
.../java/org/apache/tajo/master/TajoMaster.java | 16 +-
.../tajo/master/TajoMasterClientService.java | 9 +-
.../apache/tajo/master/TajoMasterService.java | 161 ----------
.../apache/tajo/master/exec/QueryExecutor.java | 4 +-
.../tajo/master/rm/TajoResourceTracker.java | 12 +-
.../master/rm/TajoWorkerResourceManager.java | 14 +-
.../tajo/master/rm/WorkerResourceManager.java | 17 +-
.../apache/tajo/master/scheduler/Scheduler.java | 2 +-
.../master/scheduler/SimpleFifoScheduler.java | 8 +-
.../tajo/querymaster/QueryInProgress.java | 230 --------------
.../apache/tajo/querymaster/QueryMaster.java | 95 ++----
.../java/org/apache/tajo/querymaster/Stage.java | 17 -
.../main/java/org/apache/tajo/util/JSPUtil.java | 2 +-
.../tajo/worker/TajoResourceAllocator.java | 37 ++-
.../java/org/apache/tajo/worker/TajoWorker.java | 20 +-
.../tajo/worker/WorkerHeartbeatService.java | 27 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 14 +-
.../main/proto/QueryCoordinatorProtocol.proto | 147 +++++++++
.../main/proto/ResourceTrackerProtocol.proto | 2 +-
.../src/main/proto/TajoMasterProtocol.proto | 147 ---------
.../src/main/resources/webapps/admin/index.jsp | 2 +-
.../src/main/resources/webapps/admin/query.jsp | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 2 +-
.../tajo/master/rm/TestTajoResourceManager.java | 3 +-
31 files changed, 979 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e38f78..89488da 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
+ (hyunsik)
+
TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
TAJO-1282: Cleanup the relationship of QueryInProgress and
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index a7205dd..05ccf07 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -166,7 +166,7 @@
<argument>src/main/proto/ContainerProtocol.proto</argument>
<argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
<argument>src/main/proto/QueryMasterProtocol.proto</argument>
- <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+ <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
<argument>src/main/proto/TajoWorkerProtocol.proto</argument>
<argument>src/main/proto/InternalTypes.proto</argument>
</arguments>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
new file mode 100644
index 0000000..1cb3842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class QueryCoordinatorService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
+
+ private final TajoMaster.MasterContext context;
+ private final TajoConf conf;
+ private final ProtocolServiceHandler masterHandler;
+ private AsyncRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+ public QueryCoordinatorService(TajoMaster.MasterContext context) {
+ super(QueryCoordinatorService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.masterHandler = new ProtocolServiceHandler();
+ }
+
+ @Override
+ public void start() {
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+ try {
+ server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ server.start();
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+ NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ /**
+ * Actual protocol service handler
+ */
+ private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface {
+
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
+ }
+
+ QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+ QueryManager queryManager = context.getQueryJobManager();
+ command = queryManager.queryHeartbeat(request);
+
+ QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+ builder.setHeartbeatResult(BOOL_TRUE);
+ if(command != null) {
+ builder.setResponseCommand(command);
+ }
+
+ builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+ done.run(builder.build());
+ }
+
+ @Override
+ public void allocateWorkerResources(
+ RpcController controller,
+ QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
+ context.getResourceManager().allocateWorkerResources(request, done);
+ }
+
+ @Override
+ public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
+
+ for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
+ context.getResourceManager().releaseWorkerResource(eachContainer);
+ }
+ done.run(BOOL_TRUE);
+ }
+
+ @Override
+ public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+ RpcCallback<WorkerResourcesRequest> done) {
+
+ WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
+ Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+ for(Worker worker: workers) {
+ WorkerResource resource = worker.getResource();
+
+ WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+
+ workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
+ workerResource.setMemoryMB(resource.getMemoryMB());
+ workerResource.setDiskSlots(resource.getDiskSlots());
+
+ builder.addWorkerResources(workerResource);
+ }
+ done.run(builder.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
new file mode 100644
index 0000000..73d8cb2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress {
+ private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+ private QueryId queryId;
+
+ private Session session;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private QueryInfo queryInfo;
+
+ private final TajoMaster.MasterContext masterContext;
+
+ private NettyClientBase queryMasterRpc;
+
+ private QueryMasterProtocolService queryMasterRpcClient;
+
+ public QueryInProgress(
+ TajoMaster.MasterContext masterContext,
+ Session session,
+ QueryContext queryContext,
+ QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+
+ this.masterContext = masterContext;
+ this.session = session;
+ this.queryId = queryId;
+ this.plan = plan;
+
+ queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+ queryInfo.setStartTime(System.currentTimeMillis());
+ }
+
+ public synchronized void kill() {
+ getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+ if(queryMasterRpcClient != null){
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
+ }
+
+ public void stopProgress() {
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ LOG.info("=========================================================");
+ LOG.info("Stop query:" + queryId);
+
+ masterContext.getResourceManager().releaseQueryMaster(queryId);
+
+ if(queryMasterRpc != null) {
+ RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+ }
+
+ masterContext.getHistoryWriter().appendHistory(queryInfo);
+ }
+
+ public boolean startQueryMaster() {
+ try {
+ LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+ WorkerResourceManager resourceManager = masterContext.getResourceManager();
+ WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+ // if no resource to allocate a query master
+ if(resource == null) {
+ LOG.info("No Available Resources for QueryMaster");
+ return false;
+ }
+
+ queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+ queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+ queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+ return true;
+ } catch (Exception e) {
+ catchException(e);
+ return false;
+ }
+ }
+
+ private void connectQueryMaster() throws Exception {
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+ LOG.info("Connect to QueryMaster:" + addr);
+ queryMasterRpc =
+ RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+ queryMasterRpcClient = queryMasterRpc.getStub();
+ }
+
+ public synchronized void submmitQueryToMaster() {
+ if(querySubmitted.get()) {
+ return;
+ }
+
+ try {
+ if(queryMasterRpcClient == null) {
+ connectQueryMaster();
+ }
+ if(queryMasterRpcClient == null) {
+ LOG.info("No QueryMaster conneciton info.");
+ //TODO wait
+ return;
+ }
+ LOG.info("Call executeQuery to :" +
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+ QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+ builder.setQueryId(queryId.getProto())
+ .setQueryContext(queryInfo.getQueryContext().getProto())
+ .setSession(session.getProto())
+ .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+ .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+ queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+ querySubmitted.set(true);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void catchException(Exception e) {
+ LOG.error(e.getMessage(), e);
+ queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+ queryInfo.setLastMessage(StringUtils.stringifyException(e));
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public boolean isStarted() {
+ return !stopped.get() && this.querySubmitted.get();
+ }
+
+ public void heartbeat(QueryInfo queryInfo) {
+ LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+ // to avoid partial update by different heartbeats
+ synchronized (this.queryInfo) {
+
+ // terminal state will let client to retrieve a query result
+ // So, we must set the query result before changing query state
+ if (isFinishState(queryInfo.getQueryState())) {
+ if (queryInfo.hasResultdesc()) {
+ this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+ }
+ }
+
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+ // Update diagnosis message
+ if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+
+ // if any error occurs, print outs the error message
+ if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+
+ if (isFinishState(this.queryInfo.getQueryState())) {
+ masterContext.getQueryJobManager().getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+ }
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
deleted file mode 100644
index 6a8da27..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.querymaster.QueryJobEvent;
-import org.apache.tajo.session.Session;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * QueryJobManager manages all scheduled and running queries.
- * It receives all Query related events and routes them to each QueryInProgress.
- */
-public class QueryJobManager extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
- // TajoMaster Context
- private final TajoMaster.MasterContext masterContext;
-
- private AsyncDispatcher dispatcher;
-
- private SimpleFifoScheduler scheduler;
-
- private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
- private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
- private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
- private AtomicLong maxExecutionTime = new AtomicLong();
- private AtomicLong avgExecutionTime = new AtomicLong();
- private AtomicLong executedQuerySize = new AtomicLong();
-
- public QueryJobManager(final TajoMaster.MasterContext masterContext) {
- super(QueryJobManager.class.getName());
- this.masterContext = masterContext;
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- try {
- this.dispatcher = new AsyncDispatcher();
- addService(this.dispatcher);
-
- this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
- this.scheduler = new SimpleFifoScheduler(this);
- } catch (Exception e) {
- catchException(null, e);
- }
-
- super.serviceInit(conf);
- }
-
- @Override
- public void serviceStop() throws Exception {
- synchronized(runningQueries) {
- for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
- eachQueryInProgress.stopProgress();
- }
- }
- this.scheduler.stop();
- super.serviceStop();
- }
-
- @Override
- public void serviceStart() throws Exception {
- this.scheduler.start();
- super.serviceStart();
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public Collection<QueryInProgress> getSubmittedQueries() {
- synchronized (submittedQueries){
- return Collections.unmodifiableCollection(submittedQueries.values());
- }
- }
-
- public Collection<QueryInProgress> getRunningQueries() {
- synchronized (runningQueries){
- return Collections.unmodifiableCollection(runningQueries.values());
- }
- }
-
- public synchronized Collection<QueryInfo> getFinishedQueries() {
- try {
- return this.masterContext.getHistoryReader().getQueries(null);
- } catch (Throwable e) {
- LOG.error(e);
- return Lists.newArrayList();
- }
- }
-
-
- public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
- try {
- return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
- } catch (Throwable e) {
- LOG.error(e);
- return null;
- }
- }
-
- public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
- String jsonExpr, LogicalRootNode plan)
- throws Exception {
- QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
- jsonExpr, plan);
-
- synchronized (submittedQueries) {
- queryInProgress.getQueryInfo().setQueryMaster("");
- submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- scheduler.addQuery(queryInProgress);
- return queryInProgress.getQueryInfo();
- }
-
- public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
- QueryInProgress queryInProgress;
-
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.remove(queryId);
- }
-
- synchronized (runningQueries) {
- runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- if (queryInProgress.startQueryMaster()) {
- dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
- queryInProgress.getQueryInfo()));
- } else {
- stopQuery(queryId);
- }
-
- return queryInProgress.getQueryInfo();
- }
-
- class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-
- @Override
- public void handle(QueryJobEvent event) {
- QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-
-
- if (queryInProgress == null) {
- LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
- return;
- }
-
-
- if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
- queryInProgress.submmitQueryToMaster();
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
- stopQuery(event.getQueryInfo().getQueryId());
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.kill();
- stopQuery(queryInProgress.getQueryId());
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
- queryInProgress.heartbeat(event.getQueryInfo());
- }
- }
- }
-
- public QueryInProgress getQueryInProgress(QueryId queryId) {
- QueryInProgress queryInProgress;
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.get(queryId);
- }
-
- if (queryInProgress == null) {
- synchronized (runningQueries) {
- queryInProgress = runningQueries.get(queryId);
- }
- }
- return queryInProgress;
- }
-
- public void stopQuery(QueryId queryId) {
- LOG.info("Stop QueryInProgress:" + queryId);
- QueryInProgress queryInProgress = getQueryInProgress(queryId);
- if(queryInProgress != null) {
- queryInProgress.stopProgress();
- synchronized(submittedQueries) {
- submittedQueries.remove(queryId);
- }
-
- synchronized(runningQueries) {
- runningQueries.remove(queryId);
- }
-
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
- long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
- if (executionTime < minExecutionTime.get()) {
- minExecutionTime.set(executionTime);
- }
-
- if (executionTime > maxExecutionTime.get()) {
- maxExecutionTime.set(executionTime);
- }
-
- long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
- if (totalExecutionTime > 0) {
- avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
- } else {
- avgExecutionTime.set(executionTime);
- }
- executedQuerySize.incrementAndGet();
- } else {
- LOG.warn("No QueryInProgress while query stopping: " + queryId);
- }
- }
-
- public long getMinExecutionTime() {
- if (getExecutedQuerySize() == 0) return 0;
- return minExecutionTime.get();
- }
-
- public long getMaxExecutionTime() {
- return maxExecutionTime.get();
- }
-
- public long getAvgExecutionTime() {
- return avgExecutionTime.get();
- }
-
- public long getExecutedQuerySize() {
- return executedQuerySize.get();
- }
-
- private void catchException(QueryId queryId, Exception e) {
- LOG.error(e.getMessage(), e);
- QueryInProgress queryInProgress = runningQueries.get(queryId);
- queryInProgress.catchException(e);
- }
-
- public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
- TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
- if(queryInProgress == null) {
- return null;
- }
-
- QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
- return null;
- }
-
- private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
- WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
- queryInfo.setQueryMaster(connectionInfo.getHost());
- queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
- queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
- queryInfo.setQueryState(queryHeartbeat.getState());
- queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
- if (queryHeartbeat.hasQueryFinishTime()) {
- queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
- }
-
- if (queryHeartbeat.hasResultDesc()) {
- queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
- }
-
- return queryInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
new file mode 100644
index 0000000..296be04
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * QueryManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
+public class QueryManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryManager.class.getName());
+
+ // TajoMaster Context
+ private final TajoMaster.MasterContext masterContext;
+
+ private AsyncDispatcher dispatcher;
+
+ private SimpleFifoScheduler scheduler;
+
+ private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+ private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+ private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+ private AtomicLong maxExecutionTime = new AtomicLong();
+ private AtomicLong avgExecutionTime = new AtomicLong();
+ private AtomicLong executedQuerySize = new AtomicLong();
+
+ public QueryManager(final TajoMaster.MasterContext masterContext) {
+ super(QueryManager.class.getName());
+ this.masterContext = masterContext;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ try {
+ this.dispatcher = new AsyncDispatcher();
+ addService(this.dispatcher);
+
+ this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+ this.scheduler = new SimpleFifoScheduler(this);
+ } catch (Exception e) {
+ catchException(null, e);
+ }
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized(runningQueries) {
+ for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+ eachQueryInProgress.stopProgress();
+ }
+ }
+ this.scheduler.stop();
+ super.serviceStop();
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ this.scheduler.start();
+ super.serviceStart();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public Collection<QueryInProgress> getSubmittedQueries() {
+ synchronized (submittedQueries){
+ return Collections.unmodifiableCollection(submittedQueries.values());
+ }
+ }
+
+ public Collection<QueryInProgress> getRunningQueries() {
+ synchronized (runningQueries){
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
+ }
+
+ public synchronized Collection<QueryInfo> getFinishedQueries() {
+ try {
+ return this.masterContext.getHistoryReader().getQueries(null);
+ } catch (Throwable e) {
+ LOG.error(e);
+ return Lists.newArrayList();
+ }
+ }
+
+
+ public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+ try {
+ return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+ } catch (Throwable e) {
+ LOG.error(e);
+ return null;
+ }
+ }
+
+ public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql,
+ String jsonExpr, LogicalRootNode plan)
+ throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+ jsonExpr, plan);
+
+ synchronized (submittedQueries) {
+ queryInProgress.getQueryInfo().setQueryMaster("");
+ submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ }
+
+ scheduler.addQuery(queryInProgress);
+ return queryInProgress.getQueryInfo();
+ }
+
+ public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+ QueryInProgress queryInProgress;
+
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.remove(queryId);
+ }
+
+ synchronized (runningQueries) {
+ runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ }
+
+ if (queryInProgress.startQueryMaster()) {
+ dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+ queryInProgress.getQueryInfo()));
+ } else {
+ stopQuery(queryId);
+ }
+
+ return queryInProgress.getQueryInfo();
+ }
+
+ class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
+ @Override
+ public void handle(QueryJobEvent event) {
+ QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+
+
+ if (queryInProgress == null) {
+ LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+ return;
+ }
+
+
+ if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ queryInProgress.submmitQueryToMaster();
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+ stopQuery(event.getQueryInfo().getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ scheduler.removeQuery(queryInProgress.getQueryId());
+ queryInProgress.kill();
+ stopQuery(queryInProgress.getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ queryInProgress.heartbeat(event.getQueryInfo());
+ }
+ }
+ }
+
+ public QueryInProgress getQueryInProgress(QueryId queryId) {
+ QueryInProgress queryInProgress;
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.get(queryId);
+ }
+
+ if (queryInProgress == null) {
+ synchronized (runningQueries) {
+ queryInProgress = runningQueries.get(queryId);
+ }
+ }
+ return queryInProgress;
+ }
+
+ public void stopQuery(QueryId queryId) {
+ LOG.info("Stop QueryInProgress:" + queryId);
+ QueryInProgress queryInProgress = getQueryInProgress(queryId);
+ if(queryInProgress != null) {
+ queryInProgress.stopProgress();
+ synchronized(submittedQueries) {
+ submittedQueries.remove(queryId);
+ }
+
+ synchronized(runningQueries) {
+ runningQueries.remove(queryId);
+ }
+
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+ if (executionTime < minExecutionTime.get()) {
+ minExecutionTime.set(executionTime);
+ }
+
+ if (executionTime > maxExecutionTime.get()) {
+ maxExecutionTime.set(executionTime);
+ }
+
+ long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+ if (totalExecutionTime > 0) {
+ avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+ } else {
+ avgExecutionTime.set(executionTime);
+ }
+ executedQuerySize.incrementAndGet();
+ } else {
+ LOG.warn("No QueryInProgress while query stopping: " + queryId);
+ }
+ }
+
+ public long getMinExecutionTime() {
+ if (getExecutedQuerySize() == 0) return 0;
+ return minExecutionTime.get();
+ }
+
+ public long getMaxExecutionTime() {
+ return maxExecutionTime.get();
+ }
+
+ public long getAvgExecutionTime() {
+ return avgExecutionTime.get();
+ }
+
+ public long getExecutedQuerySize() {
+ return executedQuerySize.get();
+ }
+
+ private void catchException(QueryId queryId, Exception e) {
+ LOG.error(e.getMessage(), e);
+ QueryInProgress queryInProgress = runningQueries.get(queryId);
+ queryInProgress.catchException(e);
+ }
+
+ public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+ QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryInProgress == null) {
+ return null;
+ }
+
+ QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+ return null;
+ }
+
+ private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+ WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+ queryInfo.setQueryMaster(connectionInfo.getHost());
+ queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+ queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+ queryInfo.setQueryState(queryHeartbeat.getState());
+ queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+ if (queryHeartbeat.hasQueryFinishTime()) {
+ queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+ }
+
+ if (queryHeartbeat.hasResultDesc()) {
+ queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+ }
+
+ return queryInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7209080..2ffd7ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.master.container.TajoContainer;
@@ -177,23 +177,23 @@ public class TajoContainerProxy extends ContainerProxy {
if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(conf));
context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(conf));
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
- TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+ QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
.addAllContainerIds(containerIdProtos)
.build(),
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c054599..786025a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService {
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
- private TajoMasterService tajoMasterService;
+ private QueryCoordinatorService tajoMasterService;
private SessionManager sessionManager;
private WorkerResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
- private QueryJobManager queryJobManager;
+ private QueryManager queryManager;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -183,13 +183,13 @@ public class TajoMaster extends CompositeService {
globalEngine = new GlobalEngine(context);
addIfService(globalEngine);
- queryJobManager = new QueryJobManager(context);
- addIfService(queryJobManager);
+ queryManager = new QueryManager(context);
+ addIfService(queryManager);
tajoMasterClientService = new TajoMasterClientService(context);
addIfService(tajoMasterClientService);
- tajoMasterService = new TajoMasterService(context);
+ tajoMasterService = new QueryCoordinatorService(context);
addIfService(tajoMasterService);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -441,8 +441,8 @@ public class TajoMaster extends CompositeService {
return clock;
}
- public QueryJobManager getQueryJobManager() {
- return queryJobManager;
+ public QueryManager getQueryJobManager() {
+ return queryManager;
}
public WorkerResourceManager getResourceManager() {
@@ -469,7 +469,7 @@ public class TajoMaster extends CompositeService {
return storeManager;
}
- public TajoMasterService getTajoMasterService() {
+ public QueryCoordinatorService getTajoMasterService() {
return tajoMasterService;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 93326be..fcc016a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
@@ -567,8 +566,8 @@ public class TajoMasterClientService extends AbstractService {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
- QueryJobManager queryJobManager = context.getQueryJobManager();
- QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+ QueryManager queryManager = context.getQueryJobManager();
+ QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId);
QueryInfo queryInfo = null;
if (queryInProgress == null) {
@@ -598,8 +597,8 @@ public class TajoMasterClientService extends AbstractService {
try {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
- QueryJobManager queryJobManager = context.getQueryJobManager();
- queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+ QueryManager queryManager = context.getQueryJobManager();
+ queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
new QueryInfo(queryId)));
return BOOL_TRUE;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
deleted file mode 100644
index 02bdfa1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-
-public class TajoMasterService extends AbstractService {
- private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
-
- private final TajoMaster.MasterContext context;
- private final TajoConf conf;
- private final TajoMasterServiceHandler masterHandler;
- private AsyncRpcServer server;
- private InetSocketAddress bindAddress;
-
- private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
- private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
- public TajoMasterService(TajoMaster.MasterContext context) {
- super(TajoMasterService.class.getName());
- this.context = context;
- this.conf = context.getConf();
- this.masterHandler = new TajoMasterServiceHandler();
- }
-
- @Override
- public void start() {
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
- int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
- try {
- server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
- } catch (Exception e) {
- LOG.error(e);
- }
- server.start();
- bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
- NetUtils.normalizeInetSocketAddress(bindAddress));
- LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
- super.start();
- }
-
- @Override
- public void stop() {
- if(server != null) {
- server.shutdown();
- server = null;
- }
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return bindAddress;
- }
-
- public class TajoMasterServiceHandler
- implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
- @Override
- public void heartbeat(
- RpcController controller,
- TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
- }
-
- TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
-
- QueryJobManager queryJobManager = context.getQueryJobManager();
- command = queryJobManager.queryHeartbeat(request);
-
- TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
- builder.setHeartbeatResult(BOOL_TRUE);
- if(command != null) {
- builder.setResponseCommand(command);
- }
-
- builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
- done.run(builder.build());
- }
-
- @Override
- public void allocateWorkerResources(
- RpcController controller,
- TajoMasterProtocol.WorkerResourceAllocationRequest request,
- RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
- context.getResourceManager().allocateWorkerResources(request, done);
- }
-
- @Override
- public void releaseWorkerResource(RpcController controller,
- TajoMasterProtocol.WorkerResourceReleaseRequest request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
- for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
- context.getResourceManager().releaseWorkerResource(eachContainer);
- }
- done.run(BOOL_TRUE);
- }
-
- @Override
- public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
- RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
-
- TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
- TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
- Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
- for(Worker worker: workers) {
- WorkerResource resource = worker.getResource();
-
- TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
- TajoMasterProtocol.WorkerResourceProto.newBuilder();
-
- workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
- workerResource.setMemoryMB(resource.getMemoryMB());
- workerResource.setDiskSlots(resource.getDiskSlots());
-
- builder.addWorkerResources(workerResource);
- }
- done.run(builder.build());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2fbebc1..0860d63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -388,10 +388,10 @@ public class QueryExecutor {
context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
- QueryJobManager queryJobManager = this.context.getQueryJobManager();
+ QueryManager queryManager = this.context.getQueryJobManager();
QueryInfo queryInfo;
- queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+ queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
if(queryInfo == null) {
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 831ce43..519aa9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
@@ -36,8 +37,6 @@ import org.apache.tajo.util.ProtoUtil;
import java.io.IOError;
import java.net.InetSocketAddress;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
@@ -110,7 +109,8 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
/** The response builder */
- private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+ private static final TajoHeartbeatResponse.Builder builder =
+ TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
return new WorkerStatusEvent(
@@ -204,7 +204,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
}
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ public ClusterResourceSummary getClusterResourceSummary() {
int totalDiskSlots = 0;
int totalCpuCoreSlots = 0;
int totalMemoryMB = 0;
@@ -230,7 +230,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
}
- return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ return ClusterResourceSummary.newBuilder()
.setNumWorkers(rmContext.getWorkers().size())
.setTotalCpuCoreSlots(totalCpuCoreSlots)
.setTotalDiskSlots(totalDiskSlots)
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 9f2a3d5..e5cf66c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.ApplicationIdUtils;
@@ -49,8 +49,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.tajo.ipc.TajoMasterProtocol.*;
-
/**
* It manages all resources of tajo workers.
@@ -162,7 +160,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ public ClusterResourceSummary getClusterResourceSummary() {
return resourceTracker.getClusterResourceSummary();
}
@@ -204,7 +202,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
- builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+ builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
builder.setNumContainers(1);
return builder.build();
}
@@ -358,10 +356,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
int allocatedResources = 0;
- TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+ ResourceRequestPriority resourceRequestPriority
= resourceRequest.request.getResourceRequestPriority();
- if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+ if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 79ec0ac..662b699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -22,16 +22,15 @@ import com.google.protobuf.RpcCallback;
import org.apache.hadoop.service.Service;
import org.apache.tajo.QueryId;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
+import org.apache.tajo.master.QueryInProgress;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
/**
* An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
* and release the allocated containers.
@@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service {
* @return A allocated container resource
*/
@Deprecated
- public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+ public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
/**
* Request one or more resource containers. You can set the number of containers and resource capabilities, such as
@@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service {
* @param request Request description
* @param rpcCallBack Callback function
*/
- public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
- RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+ public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+ RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack);
/**
* Release a container
@@ -100,7 +99,7 @@ public interface WorkerResourceManager extends Service {
*
* @return The overall summary of cluster resources
*/
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+ public ClusterResourceSummary getClusterResourceSummary();
/**
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
index 02203a9..19493d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.scheduler;
import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index a091ed5..6cb98eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.master.QueryJobManager;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryManager;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler {
private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
private final Thread queryProcessor;
private AtomicBoolean stopped = new AtomicBoolean();
- private QueryJobManager manager;
+ private QueryManager manager;
private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
- public SimpleFifoScheduler(QueryJobManager manager) {
+ public SimpleFifoScheduler(QueryManager manager) {
this.manager = manager;
this.queryProcessor = new Thread(new QueryProcessor());
this.queryProcessor.setName("Query Processor");
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
deleted file mode 100644
index f83f244..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.session.Session;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress {
- private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
- private QueryId queryId;
-
- private Session session;
-
- private LogicalRootNode plan;
-
- private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private QueryInfo queryInfo;
-
- private final TajoMaster.MasterContext masterContext;
-
- private NettyClientBase queryMasterRpc;
-
- private QueryMasterProtocolService queryMasterRpcClient;
-
- public QueryInProgress(
- TajoMaster.MasterContext masterContext,
- Session session,
- QueryContext queryContext,
- QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-
- this.masterContext = masterContext;
- this.session = session;
- this.queryId = queryId;
- this.plan = plan;
-
- queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
- queryInfo.setStartTime(System.currentTimeMillis());
- }
-
- public synchronized void kill() {
- getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
- if(queryMasterRpcClient != null){
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
- }
- }
-
- public void stopProgress() {
- if(stopped.getAndSet(true)) {
- return;
- }
-
- LOG.info("=========================================================");
- LOG.info("Stop query:" + queryId);
-
- masterContext.getResourceManager().releaseQueryMaster(queryId);
-
- if(queryMasterRpc != null) {
- RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
- }
-
- masterContext.getHistoryWriter().appendHistory(queryInfo);
- }
-
- public boolean startQueryMaster() {
- try {
- LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
- WorkerResourceManager resourceManager = masterContext.getResourceManager();
- WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
- // if no resource to allocate a query master
- if(resource == null) {
- LOG.info("No Available Resources for QueryMaster");
- return false;
- }
-
- queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
- queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
- queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
- return true;
- } catch (Exception e) {
- catchException(e);
- return false;
- }
- }
-
- private void connectQueryMaster() throws Exception {
- InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
- LOG.info("Connect to QueryMaster:" + addr);
- queryMasterRpc =
- RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
- queryMasterRpcClient = queryMasterRpc.getStub();
- }
-
- public synchronized void submmitQueryToMaster() {
- if(querySubmitted.get()) {
- return;
- }
-
- try {
- if(queryMasterRpcClient == null) {
- connectQueryMaster();
- }
- if(queryMasterRpcClient == null) {
- LOG.info("No QueryMaster conneciton info.");
- //TODO wait
- return;
- }
- LOG.info("Call executeQuery to :" +
- queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
- QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
- builder.setQueryId(queryId.getProto())
- .setQueryContext(queryInfo.getQueryContext().getProto())
- .setSession(session.getProto())
- .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
- .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
- queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
- querySubmitted.set(true);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- public void catchException(Exception e) {
- LOG.error(e.getMessage(), e);
- queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
- queryInfo.setLastMessage(StringUtils.stringifyException(e));
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public QueryInfo getQueryInfo() {
- return this.queryInfo;
- }
-
- public boolean isStarted() {
- return !stopped.get() && this.querySubmitted.get();
- }
-
- public void heartbeat(QueryInfo queryInfo) {
- LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
- // to avoid partial update by different heartbeats
- synchronized (this.queryInfo) {
-
- // terminal state will let client to retrieve a query result
- // So, we must set the query result before changing query state
- if (isFinishState(queryInfo.getQueryState())) {
- if (queryInfo.hasResultdesc()) {
- this.queryInfo.setResultDesc(queryInfo.getResultDesc());
- }
- }
-
- this.queryInfo.setQueryState(queryInfo.getQueryState());
- this.queryInfo.setProgress(queryInfo.getProgress());
- this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
- // Update diagnosis message
- if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
- this.queryInfo.setLastMessage(queryInfo.getLastMessage());
- LOG.info(queryId + queryInfo.getLastMessage());
- }
-
- // if any error occurs, print outs the error message
- if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
- LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
- }
-
-
- if (isFinishState(this.queryInfo.getQueryState())) {
- masterContext.getQueryJobManager().getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
- }
- }
- }
-
- private boolean isFinishState(TajoProtos.QueryState state) {
- return state == TajoProtos.QueryState.QUERY_FAILED ||
- state == TajoProtos.QueryState.QUERY_KILLED ||
- state == TajoProtos.QueryState.QUERY_SUCCEEDED;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 02760a3..53390a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -34,7 +34,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.master.event.QueryStopEvent;
@@ -56,10 +57,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
@@ -182,12 +179,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
LOG.info("cleanup executionBlocks: " + cleanupMessage);
NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+ List<WorkerResourceProto> workers = getAllWorker();
TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ for (WorkerResourceProto worker : workers) {
try {
TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -206,9 +203,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
private void cleanup(QueryId queryId) {
LOG.info("cleanup query resources : " + queryId);
NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+ List<WorkerResourceProto> workers = getAllWorker();
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ for (WorkerResourceProto worker : workers) {
try {
TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -224,7 +221,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
- public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+ public List<WorkerResourceProto> getAllWorker() {
NettyClientBase rpc = null;
try {
@@ -235,78 +232,34 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(systemConf));
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+ QueryCoordinatorProtocolService masterService = rpc.getStub();
- CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+ CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
masterService.getAllWorkerResource(callBack.getController(),
PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
- TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+ WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
return workerResourcesRequest.getWorkerResourcesList();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(rpc);
}
- return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
- }
-
- public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
- LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
- TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setConnectionInfo(workerContext.getConnectionInfo().getProto())
- .setState(state)
- .setQueryId(queryId.getProto());
-
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
-
- masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
+ return new ArrayList<WorkerResourceProto>();
}
@Override
@@ -407,19 +360,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
} catch (Exception e) {
//this function will be closed in new thread.
@@ -524,24 +477,24 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(systemConf));
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
+ CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
[3/8] tajo git commit: TAJO-1287: Repeated using of the same order by
key in multiple window clauses should be supported. (Keuntae Park)
Posted by ji...@apache.org.
TAJO-1287: Repeated using of the same order by key in multiple window clauses should be supported. (Keuntae Park)
Closes #337
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/03801a31
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/03801a31
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/03801a31
Branch: refs/heads/index_support
Commit: 03801a3129301745e16f0a7b40102a56e9d13389
Parents: 307c6c9
Author: Keuntae Park <si...@apache.org>
Authored: Fri Jan 9 15:22:11 2015 +0900
Committer: Keuntae Park <si...@apache.org>
Committed: Fri Jan 9 15:22:11 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../engine/planner/physical/WindowAggExec.java | 11 +++++++-
.../tajo/engine/query/TestWindowQuery.java | 29 ++++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 35670d7..66bb299 100644
--- a/CHANGES
+++ b/CHANGES
@@ -154,6 +154,9 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1287: Repeated using of the same order by key in multiple
+ window clauses should be supported. (Keuntae Park)
+
TAJO-1265: min(), max() does not handle null properly. (Keuntae Park)
TAJO-1270: Fix typos. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index e36dcd8..a36bd4f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -143,7 +143,16 @@ public class WindowAggExec extends UnaryPhysicalExec {
for (SortSpec sortSpec : functions[i].getSortSpecs()) {
if (!rewrittenSchema.contains(sortSpec.getSortKey())) {
- additionalSortKeyColumns.add(sortSpec.getSortKey());
+ // check if additionalSortKeyColumns already has that sort key
+ boolean newKey = true;
+ for (Column c : additionalSortKeyColumns) {
+ if (c.equals(sortSpec.getSortKey())) {
+ newKey = false;
+ }
+ }
+ if (newKey) {
+ additionalSortKeyColumns.add(sortSpec.getSortKey());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
index 668ba70..14ab58f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
@@ -321,4 +321,33 @@ public class TestWindowQuery extends QueryTestCaseBase {
executeString("DROP TABLE lastvaluetime PURGE");
}
}
+
+ @Test
+ public final void testMultipleWindow() throws Exception {
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("time", TajoDataTypes.Type.TIME);
+ schema.addColumn("name", TajoDataTypes.Type.TEXT);
+ String[] data = new String[]{ "1|12:11:12|abc", "2|10:11:13|def", "2|05:42:41|ghi" };
+ TajoTestingCluster.createTable("multiwindow", schema, tableOptions, data, 1);
+
+ try {
+ ResultSet res = executeString(
+ "select id, last_value(time) over ( partition by id order by time ) as time_last, last_value(name) over ( partition by id order by time ) as name_last from multiwindow");
+ String ascExpected = "id,time_last,name_last\n" +
+ "-------------------------------\n" +
+ "1,12:11:12,abc\n" +
+ "2,10:11:13,def\n" +
+ "2,10:11:13,def\n";
+
+ assertEquals(ascExpected, resultSetToString(res));
+ res.close();
+ } finally {
+ executeString("DROP TABLE multiwindow PURGE");
+ }
+ }
}
\ No newline at end of file
[6/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to
QueryCoordinatorProtocol.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
}
/**
- * Getting the total memory of cluster
- *
- * @param stage
- * @return mega bytes
- */
- private static int getClusterTotalMemory(Stage stage) {
- List<TajoMasterProtocol.WorkerResourceProto> workers =
- stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
- int totalMem = 0;
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- totalMem += worker.getMemoryMB();
- }
- return totalMem;
- }
- /**
* Getting the desire number of partitions according to the volume of input data.
* This method is only used to determine the partition key number of hash join or aggregation.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Stage;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
import java.net.InetSocketAddress;
import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int memoryMBPerTask) {
//TODO consider disk slot
- TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+ ClusterResourceSummary clusterResource = workerContext.getClusterResource();
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
- CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+ CallFuture<WorkerResourceAllocationResponse> callBack =
+ new CallFuture<WorkerResourceAllocationResponse>();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
- TajoMasterProtocol.WorkerResourceAllocationRequest request =
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setMinMemoryMBPerContainer(requiredMemoryMB)
.setMaxMemoryMBPerContainer(requiredMemoryMB)
.setNumContainers(event.getRequiredNum())
- .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
- : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setResourceRequestPriority(!event.isLeafQuery() ?
+ ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
.setMinDiskSlotPerContainer(requiredDiskSlots)
.setMaxDiskSlotPerContainer(requiredDiskSlots)
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
try {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryTaskContext.getQueryMasterContext().getWorkerContext().
setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
connPool.releaseConnection(tmClient);
}
- TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+ WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int numAllocatedContainers = 0;
if(response != null) {
- List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+ List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<TajoContainer> containers = new ArrayList<TajoContainer>();
- for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+ for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterNodes = new AtomicInteger();
- private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+ private ClusterResourceSummary clusterResource;
private WorkerConnectionInfo connectionInfo;
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
return TajoWorker.this.numClusterNodes.get();
}
- public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+ public void setClusterResource(ClusterResourceSummary clusterResource) {
synchronized (numClusterNodes) {
TajoWorker.this.clusterResource = clusterResource;
}
}
- public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+ public ClusterResourceSummary getClusterResource() {
synchronized (numClusterNodes) {
return TajoWorker.this.clusterResource;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.DiskDeviceInfo;
import org.apache.tajo.storage.DiskMountInfo;
import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
import java.io.File;
import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
- TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+ ServerStatusProto.System systemInfo;
+ List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
}
}
- systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+ systemInfo = ServerStatusProto.System.newBuilder()
.setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
- TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
- TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ ServerStatusProto.JvmHeap jvmHeap =
+ ServerStatusProto.JvmHeap.newBuilder()
.setMaxHeap(Runtime.getRuntime().maxMemory())
.setFreeHeap(Runtime.getRuntime().freeMemory())
.setTotalHeap(Runtime.getRuntime().totalMemory())
.build();
- TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+ ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
.setRunningTaskNum(
context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
NettyClientBase rmClient = null;
try {
- CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
- new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+ CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
// In TajoMaster HA mode, if backup master be active status,
// worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
- TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
- TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
if(clusterResourceSummary.getNumWorkers() > 0) {
context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
if(mountInfos != null) {
for(DiskMountInfo eachMount: mountInfos) {
File eachFile = new File(eachMount.getMountPath());
- diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ diskInfos.add(ServerStatusProto.Disk.newBuilder()
.setAbsolutePath(eachFile.getAbsolutePath())
.setTotalSpace(eachFile.getTotalSpace())
.setFreeSpace(eachFile.getFreeSpace())
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
package org.apache.tajo.worker.rule;
-import java.net.InetSocketAddress;
-
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
+import java.net.InetSocketAddress;
+
/**
* With this rule, Tajo worker will check the connectivity to tajo master server.
*/
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
} else {
masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
}
- masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+ masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
masterClient.getStub();
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int32 freeMemoryMB = 2;
+ required int32 maxMemoryMB = 3;
+ required int32 totalMemoryMB = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
+ required System system = 1;
+ required float diskSlots = 2;
+ required int32 memoryResourceMB = 3;
+ repeated Disk disk = 4;
+ required int32 runningTaskNum = 5;
+ required JvmHeap jvmHeap = 6;
+ required BoolProto queryMasterMode = 7;
+ required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional QueryIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional TableDescProto resultDesc = 4;
+ optional string statusMessage = 5;
+ optional float queryProgress = 6;
+ optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ required ClusterResourceSummary clusterResourceSummary = 2;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+ required int32 numWorkers = 1;
+ required int32 totalDiskSlots = 2;
+ required int32 totalCpuCoreSlots = 3;
+ required int32 totalMemoryMB = 4;
+
+ required int32 totalAvailableDiskSlots = 5;
+ required int32 totalAvailableCpuCoreSlots = 6;
+ required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+ MEMORY = 1;
+ DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+ required QueryIdProto queryId = 1;
+ required ResourceRequestPriority resourceRequestPriority = 2;
+
+ required int32 numContainers = 3;
+
+ required int32 maxMemoryMBPerContainer = 4;
+ required int32 minMemoryMBPerContainer = 5;
+
+ required float maxDiskSlotPerContainer = 6;
+ required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required int32 memoryMB = 2 ;
+ required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+ repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+ required TajoContainerIdProto containerId = 1;
+ required WorkerConnectionInfoProto connectionInfo = 2;
+
+ required int32 allocatedMemoryMB = 3;
+ required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+ required QueryIdProto queryId = 1;
+ repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+ rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+ rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+ rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+ rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
import "ContainerProtocol.proto";
import "tajo_protos.proto";
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
- message System {
- required int32 availableProcessors = 1;
- required int32 freeMemoryMB = 2;
- required int32 maxMemoryMB = 3;
- required int32 totalMemoryMB = 4;
- }
- message Disk {
- required string absolutePath = 1;
- required int64 totalSpace = 2;
- required int64 freeSpace = 3;
- required int64 usableSpace = 4;
- }
-
- message JvmHeap {
- required int64 maxHeap = 1;
- required int64 totalHeap = 2;
- required int64 freeHeap = 3;
- }
-
- required System system = 1;
- required float diskSlots = 2;
- required int32 memoryResourceMB = 3;
- repeated Disk disk = 4;
- required int32 runningTaskNum = 5;
- required JvmHeap jvmHeap = 6;
- required BoolProto queryMasterMode = 7;
- required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
- required WorkerConnectionInfoProto connectionInfo = 1;
- optional QueryIdProto queryId = 2;
- optional QueryState state = 3;
- optional TableDescProto resultDesc = 4;
- optional string statusMessage = 5;
- optional float queryProgress = 6;
- optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
- message ResponseCommand {
- required string command = 1;
- repeated string params = 2;
- }
- required BoolProto heartbeatResult = 1;
- required ClusterResourceSummary clusterResourceSummary = 2;
- optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
- required int32 numWorkers = 1;
- required int32 totalDiskSlots = 2;
- required int32 totalCpuCoreSlots = 3;
- required int32 totalMemoryMB = 4;
-
- required int32 totalAvailableDiskSlots = 5;
- required int32 totalAvailableCpuCoreSlots = 6;
- required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
- MEMORY = 1;
- DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
- required QueryIdProto queryId = 1;
- required ResourceRequestPriority resourceRequestPriority = 2;
-
- required int32 numContainers = 3;
-
- required int32 maxMemoryMBPerContainer = 4;
- required int32 minMemoryMBPerContainer = 5;
-
- required float maxDiskSlotPerContainer = 6;
- required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
- required WorkerConnectionInfoProto connectionInfo = 1;
- required int32 memoryMB = 2 ;
- required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
- repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
- required ExecutionBlockIdProto executionBlockId = 1;
- repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
- required TajoContainerIdProto containerId = 1;
- required WorkerConnectionInfoProto connectionInfo = 2;
-
- required int32 allocatedMemoryMB = 3;
- required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
- required QueryIdProto queryId = 1;
- repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
- rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
- rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
- rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
- rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.ha.HAService" %>
<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.StringUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
import org.apache.tajo.querymaster.Query;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
[5/8] tajo git commit: TAJO-1251: Query is hanging occasionally by
shuffle report. (jinho)
Posted by ji...@apache.org.
TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
Closes #339
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/50a8a663
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/50a8a663
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/50a8a663
Branch: refs/heads/index_support
Commit: 50a8a663c2c95f14ca59f3b01ffd79b2578f7f09
Parents: 533601e
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 20:07:36 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 20:07:36 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 3 +-
.../tajo/master/event/StageEventType.java | 3 +-
.../master/event/StageShuffleReportEvent.java | 38 ++++
.../java/org/apache/tajo/querymaster/Query.java | 2 +-
.../querymaster/QueryMasterManagerService.java | 2 +-
.../java/org/apache/tajo/querymaster/Stage.java | 206 +++++++++++++------
.../org/apache/tajo/querymaster/StageState.java | 1 +
tajo-dist/pom.xml | 28 +++
9 files changed, 220 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0df49b1..4e38f78 100644
--- a/CHANGES
+++ b/CHANGES
@@ -156,6 +156,8 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
+
TAJO-1287: Repeated using of the same order by key in multiple
window clauses should be supported. (Keuntae Park)
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ab11ddd..74a9271 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -188,7 +188,8 @@ public class TajoConf extends Configuration {
/** how many launching TaskRunners in parallel */
YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")),
YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
- YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+ YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
+ Runtime.getRuntime().availableProcessors() * 2),
YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
// Query Configuration
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
index fa808d4..763d426 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -34,7 +34,8 @@ public enum StageEventType {
SQ_TASK_COMPLETED,
SQ_FAILED,
- // Producer: Completed
+ // Producer: Stage
+ SQ_SHUFFLE_REPORT,
SQ_STAGE_COMPLETED,
// Producer: Any component
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
new file mode 100644
index 0000000..924fb59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+/**
+ * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage
+ */
+public class StageShuffleReportEvent extends StageEvent {
+ private TajoWorkerProtocol.ExecutionBlockReport report;
+
+ public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) {
+ super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT);
+ this.report = report;
+ }
+
+ public TajoWorkerProtocol.ExecutionBlockReport getReport() {
+ return report;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 2932694..060e620 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -660,7 +660,6 @@ public class Query implements EventHandler<QueryEvent> {
if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
hasNext(query)) { // there remains at least one stage.
- query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
executeNextBlock(query);
} else { // if a query is completed due to finished, kill, failure, or error
query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
@@ -692,6 +691,7 @@ public class Query implements EventHandler<QueryEvent> {
public void transition(Query query, QueryEvent event) {
synchronized (query.stages) {
for (Stage stage : query.stages.values()) {
+ stage.stopFinalization();
query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 4a91326..85cc553 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -220,7 +220,7 @@ public class QueryMasterManagerService extends CompositeService
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
if (queryMasterTask != null) {
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
- queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
+ queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
}
done.run(TajoWorker.TRUE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 34c58d4..1ea7051 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -50,27 +50,30 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.LaunchTaskRunnersEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -102,6 +105,8 @@ public class Stage implements EventHandler<StageEvent> {
private long startTime;
private long finishTime;
+ private volatile long lastContactTime;
+ private Thread timeoutChecker;
volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
@@ -114,12 +119,13 @@ public class Stage implements EventHandler<StageEvent> {
private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
new AllocatedContainersCancelTransition();
private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+ private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition();
private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
protected static final StateMachineFactory<Stage, StageState,
StageEventType, StageEvent> stateMachineFactory =
- new StateMachineFactory <Stage, StageState,
- StageEventType, StageEvent> (StageState.NEW)
+ new StateMachineFactory<Stage, StageState,
+ StageEventType, StageEvent>(StageState.NEW)
// Transitions from NEW state
.addTransition(StageState.NEW,
@@ -155,6 +161,9 @@ public class Stage implements EventHandler<StageEvent> {
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_TASK_COMPLETED,
TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.FINALIZING,
+ StageEventType.SQ_SHUFFLE_REPORT,
+ STAGE_FINALIZE_TRANSITION)
.addTransition(StageState.RUNNING,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
StageEventType.SQ_STAGE_COMPLETED,
@@ -198,6 +207,24 @@ public class Stage implements EventHandler<StageEvent> {
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Transitions from FINALIZING state
+ .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+ StageEventType.SQ_SHUFFLE_REPORT,
+ STAGE_FINALIZE_TRANSITION)
+ .addTransition(StageState.FINALIZING,
+ EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+ StageEventType.SQ_STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.FINALIZING, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able Transition
+ .addTransition(StageState.FINALIZING, StageState.KILLED,
+ StageEventType.SQ_KILL)
+
// Transitions from SUCCEEDED state
.addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
StageEventType.SQ_CONTAINER_ALLOCATED,
@@ -273,14 +300,14 @@ public class Stage implements EventHandler<StageEvent> {
private final Lock writeLock;
private int totalScheduledObjectsCount;
- private int succeededObjectCount = 0;
private int completedTaskCount = 0;
- private int succeededTaskCount = 0;
+ private int succeededObjectCount = 0;
private int killedObjectCount = 0;
private int failedObjectCount = 0;
private TaskSchedulerContext schedulerContext;
- private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
- private AtomicInteger completeReportReceived = new AtomicInteger(0);
+ private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList();
+ private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
+ private AtomicBoolean stopShuffleReceiver = new AtomicBoolean();
private StageHistory finalStageHistory;
public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
@@ -465,10 +492,16 @@ public class Stage implements EventHandler<StageEvent> {
}
/**
- * It finalizes this stage. It is only invoked when the stage is succeeded.
+ * It finalizes this stage. It is only invoked when the stage is finalizing.
*/
- public void complete() {
+ public void finalizeStage() {
cleanup();
+ }
+
+ /**
+ * It complete this stage. It is only invoked when the stage is succeeded.
+ */
+ public void complete() {
finalizeStats();
setFinishTime();
eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
@@ -652,7 +685,7 @@ public class Stage implements EventHandler<StageEvent> {
}
private void releaseContainers() {
- // If there are still live TaskRunners, try to kill the containers.
+ // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request
eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
}
@@ -684,6 +717,7 @@ public class Stage implements EventHandler<StageEvent> {
@Override
public void handle(StageEvent event) {
+ lastContactTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
+ getSynchronizedState());
@@ -751,6 +785,7 @@ public class Stage implements EventHandler<StageEvent> {
LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+ stage.finalizeStage();
stage.complete();
} else {
if(stage.getSynchronizedState() == StageState.INITED) {
@@ -1192,16 +1227,19 @@ public class Stage implements EventHandler<StageEvent> {
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
- LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
- stage.getId(),
- stage.getTotalScheduledObjectsCount(),
- stage.succeededObjectCount,
- stage.killedObjectCount,
- stage.failedObjectCount));
-
- if (stage.totalScheduledObjectsCount ==
- stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ if (stage.totalScheduledObjectsCount == stage.completedTaskCount) {
+ if (stage.succeededObjectCount == stage.completedTaskCount) {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+ } else {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ }
+ } else {
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+ stage.getId(),
+ stage.totalScheduledObjectsCount,
+ stage.succeededObjectCount,
+ stage.killedObjectCount,
+ stage.failedObjectCount));
}
}
}
@@ -1244,48 +1282,94 @@ public class Stage implements EventHandler<StageEvent> {
return hashShuffleIntermediateEntries;
}
- protected void waitingIntermediateReport() {
- LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
- synchronized(completeReportReceived) {
- long startTime = System.currentTimeMillis();
- while (true) {
- if (completeReportReceived.get() >= tasks.size()) {
- LOG.info(getId() + ", completed waiting IntermediateReport");
- return;
- } else {
- try {
- completeReportReceived.wait(10 * 1000);
- } catch (InterruptedException e) {
+ protected void stopFinalization() {
+ stopShuffleReceiver.set(true);
+ }
+
+ private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> {
+
+ @Override
+ public void transition(final Stage stage, StageEvent event) {
+ //If a shuffle report are failed, remaining reports will ignore
+ if (stage.stopShuffleReceiver.get()) {
+ return;
+ }
+
+ stage.lastContactTime = System.currentTimeMillis();
+ try {
+ if (event instanceof StageShuffleReportEvent) {
+
+ StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event;
+ TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport();
+
+ if (!report.getReportSuccess()) {
+ stage.stopFinalization();
+ LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage());
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
}
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime >= 120 * 1000) {
- LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
- abort(StageState.FAILED);
- return;
+
+ stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks());
+ if (report.getIntermediateEntriesCount() > 0) {
+ for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
+ stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+ }
}
- }
- }
- }
- }
- public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
- LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks());
- if (!report.getReportSuccess()) {
- LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
- abort(StageState.FAILED);
- return;
- }
- if (report.getIntermediateEntriesCount() > 0) {
- synchronized (hashShuffleIntermediateEntries) {
- for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
- hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+ if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) {
+ LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get());
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ if (stage.timeoutChecker != null) {
+ stage.stopFinalization();
+ synchronized (stage.timeoutChecker){
+ stage.timeoutChecker.notifyAll();
+ }
+ }
+ } else {
+ LOG.info(stage.getId() + ", Received shuffle report: " +
+ stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+ }
+
+ } else {
+ LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)",
+ stage.getId().toString(),
+ stage.totalScheduledObjectsCount,
+ stage.succeededObjectCount,
+ stage.killedObjectCount));
+ stage.finalizeStage();
+ LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount);
+
+ /* FIXME implement timeout handler of stage and task */
+ if (stage.timeoutChecker != null) {
+ stage.timeoutChecker = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) {
+ long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
+ if (elapsedTime > 120 * 1000) {
+ stage.stopFinalization();
+ LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime
+ + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+ }
+ synchronized (this) {
+ try {
+ this.wait(1 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ });
+ stage.timeoutChecker.start();
+ }
}
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ stage.stopFinalization();
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
}
}
- synchronized(completeReportReceived) {
- completeReportReceived.addAndGet(report.getSucceededTasks());
- completeReportReceived.notifyAll();
- }
}
private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
index 2fd62be..2d68332 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
@@ -22,6 +22,7 @@ public enum StageState {
NEW,
INITED,
RUNNING,
+ FINALIZING,
SUCCEEDED,
FAILED,
KILL_WAIT,
http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d469ba9..3df2681 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -207,6 +207,34 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>
[8/8] tajo git commit: Merge branch 'master' of
https://github.com/apache/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://github.com/apache/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/767b9a4b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/767b9a4b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/767b9a4b
Branch: refs/heads/index_support
Commit: 767b9a4b7894f0545fca18569865299130ce190a
Parents: e04c65f 807868b
Author: SonJihoon <ji...@SonJihoonui-MacBook-Pro.local>
Authored: Fri Jan 9 23:29:49 2015 +0900
Committer: SonJihoon <ji...@SonJihoonui-MacBook-Pro.local>
Committed: Fri Jan 9 23:29:49 2015 +0900
----------------------------------------------------------------------
CHANGES | 16 +
.../java/org/apache/tajo/conf/TajoConf.java | 3 +-
tajo-core/pom.xml | 2 +-
.../engine/planner/physical/WindowAggExec.java | 11 +-
.../tajo/master/QueryCoordinatorService.java | 160 ++++++++++
.../org/apache/tajo/master/QueryInProgress.java | 228 ++++++++++++++
.../org/apache/tajo/master/QueryJobManager.java | 311 ------------------
.../org/apache/tajo/master/QueryManager.java | 315 +++++++++++++++++++
.../apache/tajo/master/TajoContainerProxy.java | 12 +-
.../java/org/apache/tajo/master/TajoMaster.java | 16 +-
.../tajo/master/TajoMasterClientService.java | 9 +-
.../apache/tajo/master/TajoMasterService.java | 168 ----------
.../tajo/master/event/StageEventType.java | 3 +-
.../master/event/StageShuffleReportEvent.java | 38 +++
.../apache/tajo/master/exec/QueryExecutor.java | 4 +-
.../tajo/master/rm/TajoResourceTracker.java | 12 +-
.../master/rm/TajoWorkerResourceManager.java | 16 +-
.../tajo/master/rm/WorkerResourceManager.java | 19 +-
.../apache/tajo/master/scheduler/Scheduler.java | 2 +-
.../master/scheduler/SimpleFifoScheduler.java | 11 +-
.../java/org/apache/tajo/querymaster/Query.java | 2 +-
.../tajo/querymaster/QueryInProgress.java | 301 ------------------
.../apache/tajo/querymaster/QueryJobEvent.java | 5 +-
.../apache/tajo/querymaster/QueryMaster.java | 97 ++----
.../querymaster/QueryMasterManagerService.java | 2 +-
.../tajo/querymaster/QueryMasterTask.java | 47 +--
.../java/org/apache/tajo/querymaster/Stage.java | 223 ++++++++-----
.../org/apache/tajo/querymaster/StageState.java | 1 +
.../main/java/org/apache/tajo/util/JSPUtil.java | 2 +-
.../tajo/worker/TajoResourceAllocator.java | 37 ++-
.../java/org/apache/tajo/worker/TajoWorker.java | 20 +-
.../tajo/worker/WorkerHeartbeatService.java | 27 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 14 +-
.../main/proto/QueryCoordinatorProtocol.proto | 147 +++++++++
.../main/proto/ResourceTrackerProtocol.proto | 2 +-
.../src/main/proto/TajoMasterProtocol.proto | 148 ---------
.../src/main/resources/webapps/admin/index.jsp | 2 +-
.../src/main/resources/webapps/admin/query.jsp | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 2 +-
.../tajo/engine/query/TestWindowQuery.java | 29 ++
.../tajo/master/rm/TestTajoResourceManager.java | 3 +-
.../apache/tajo/querymaster/TestKillQuery.java | 2 +-
tajo-dist/pom.xml | 28 ++
tajo-jdbc/pom.xml | 6 +-
.../org/apache/tajo/jdbc/JdbcConnection.java | 3 +-
.../tajo/jdbc/util/QueryStringDecoder.java | 139 ++++++++
.../tajo/jdbc/util/TestQueryStringDecoder.java | 94 ++++++
.../java/org/apache/tajo/storage/CSVFile.java | 4 +-
.../java/org/apache/tajo/storage/RowFile.java | 3 +-
.../apache/tajo/storage/avro/AvroAppender.java | 3 +-
.../tajo/storage/parquet/ParquetAppender.java | 3 +-
.../sequencefile/SequenceFileAppender.java | 4 +-
.../tajo/storage/text/DelimitedTextFile.java | 4 +-
53 files changed, 1508 insertions(+), 1254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 742665a,fd52488..7a5e7b4
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -58,11 -49,13 +55,10 @@@ import org.apache.tajo.plan.LogicalOpti
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcConnectionPool;
+ import org.apache.tajo.session.Session;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.StorageUtil;
http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
[4/8] tajo git commit: TAJO-1286: Remove netty dependency from
tajo-jdbc
Posted by ji...@apache.org.
TAJO-1286: Remove netty dependency from tajo-jdbc
Closes #341
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533601ea
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533601ea
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533601ea
Branch: refs/heads/index_support
Commit: 533601eac6a21251485576ba693635d1650b63a4
Parents: 03801a3
Author: Jihun Kang <ji...@apache.org>
Authored: Fri Jan 9 15:51:25 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Fri Jan 9 15:51:25 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
tajo-jdbc/pom.xml | 6 +-
.../org/apache/tajo/jdbc/JdbcConnection.java | 3 +-
.../tajo/jdbc/util/QueryStringDecoder.java | 139 +++++++++++++++++++
.../tajo/jdbc/util/TestQueryStringDecoder.java | 94 +++++++++++++
5 files changed, 242 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 66bb299..0df49b1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
+
TAJO-1282: Cleanup the relationship of QueryInProgress and
QueryJobManager. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 15439c6..9847416 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -116,7 +116,11 @@
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-client</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index a76443d..f00dc25 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -19,6 +19,7 @@
package org.apache.tajo.jdbc;
import com.google.protobuf.ServiceException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.TajoConstants;
@@ -27,7 +28,7 @@ import org.apache.tajo.client.QueryClient;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.jdbc.util.QueryStringDecoder;
import java.io.IOException;
import java.net.URI;
http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
new file mode 100644
index 0000000..9ec9340
--- /dev/null
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc.util;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class QueryStringDecoder {
+
+ private final Charset charset;
+ private final String rawUri;
+ private String queries;
+ private Map<String, List<String>> params;
+
+ public QueryStringDecoder(String rawUri) {
+ this(rawUri, Charset.defaultCharset());
+ }
+
+ public QueryStringDecoder(String rawUri, Charset charset) {
+ this.rawUri = rawUri;
+ this.charset = charset;
+ }
+
+ private void splitUri() {
+ if (rawUri != null && !rawUri.isEmpty()) {
+ int pathPos = rawUri.indexOf('?');
+ if (pathPos < 0) {
+ queries = "";
+ } else {
+ if ((pathPos + 1) > rawUri.length()) {
+ queries = "";
+ } else {
+ queries = rawUri.substring(pathPos + 1);
+ }
+ }
+ }
+ }
+
+ protected void decodeParams() throws MalformedURLException, UnsupportedEncodingException {
+ params = new HashMap<String, List<String>>();
+ String queries = getQueries();
+
+ if (queries != null && !queries.isEmpty()) {
+ char c = 0;
+ int startPos = 0;
+ String name = null, value = null;
+
+ for (int index = 0; index < queries.length(); index++) {
+ c = queries.charAt(index);
+ if (c == '=') {
+ name = queries.substring(startPos, index);
+ if (name.isEmpty()) {
+ throw new MalformedURLException(rawUri + " is not a valid URL.");
+ }
+ name = decodeString(name);
+ startPos = index+1;
+ } else if (c == '&') {
+ if (name == null || name.isEmpty()) {
+ throw new MalformedURLException(rawUri + " is not a valid URL.");
+ }
+ value = queries.substring(startPos, index);
+ if (value.isEmpty()) {
+ throw new MalformedURLException(rawUri + " is not a valid URL.");
+ }
+ value = decodeString(value);
+ addParameter(name, value);
+ startPos = index+1;
+ }
+ }
+
+ if (startPos > 0 && name != null && !name.isEmpty()) {
+ value = queries.substring(startPos);
+ value = decodeString(value);
+ addParameter(name, value);
+ }
+ }
+ }
+
+ protected String decodeString(String string) throws UnsupportedEncodingException {
+ String decoded = "";
+
+ if (string != null && !string.isEmpty()) {
+ decoded = URLDecoder.decode(string, charset.name());
+ }
+
+ return decoded;
+ }
+
+ protected void addParameter(String name, String value) {
+ List<String> valueList = params.get(name);
+
+ if (valueList == null) {
+ valueList = new ArrayList<String>();
+ params.put(name, valueList);
+ }
+
+ valueList.add(value);
+ }
+
+ public String getRawUri() {
+ return rawUri;
+ }
+
+ public String getQueries() {
+ if (queries == null || queries.isEmpty()) {
+ splitUri();
+ }
+ return queries;
+ }
+
+ public Map<String, List<String>> getParameters() throws MalformedURLException, UnsupportedEncodingException {
+ if (params == null || params.size() <= 0) {
+ decodeParams();
+ }
+ return params;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
new file mode 100644
index 0000000..31a09d5
--- /dev/null
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc.util;
+
+import java.net.MalformedURLException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+public class TestQueryStringDecoder {
+
+ @Test
+ public void testEmptyQuery() throws Exception {
+ QueryStringDecoder decoder = null;
+ String rawUriStr = "";
+
+ rawUriStr = "http://127.0.0.1:26002/";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is(notNullValue()));
+ assertThat(decoder.getParameters(), is(notNullValue()));
+ assertThat(decoder.getParameters().size(), is(0));
+
+ rawUriStr = "/test_path/test2?";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is(notNullValue()));
+ assertThat(decoder.getParameters(), is(notNullValue()));
+ assertThat(decoder.getParameters().size(), is(0));
+ }
+
+ @Test
+ public void testSingleQueries() throws Exception {
+ QueryStringDecoder decoder = null;
+ String rawUriStr = "";
+
+ rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567"));
+ assertThat(decoder.getParameters(), is(notNullValue()));
+ assertThat(decoder.getParameters().size(), is(3));
+ assertThat(decoder.getParameters().get("qid").get(0), is("1234"));
+ assertThat(decoder.getParameters().get("pid").get(0), is("4567"));
+
+ rawUriStr = "http://127.0.0.1:26200/?tid=2345";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is("tid=2345"));
+ assertThat(decoder.getParameters(), is(notNullValue()));
+ assertThat(decoder.getParameters().size(), is(1));
+ assertThat(decoder.getParameters().get("tid").get(0), is("2345"));
+ }
+
+ @Test
+ public void testMultipleQueries() throws Exception {
+ QueryStringDecoder decoder = null;
+ String rawUriStr = "";
+
+ rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567&tid=4890";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567&tid=4890"));
+ assertThat(decoder.getParameters(), is(notNullValue()));
+ assertThat(decoder.getParameters().size(), is(3));
+ assertThat(decoder.getParameters().get("tid").size(), is(2));
+ assertThat(decoder.getParameters().get("tid").get(0), is("2345"));
+ assertThat(decoder.getParameters().get("tid").get(1), is("4890"));
+ }
+
+ @Test(expected=MalformedURLException.class)
+ public void testMalformedURI() throws Exception {
+ QueryStringDecoder decoder = null;
+ String rawUriStr = "";
+
+ rawUriStr = "http://127.0.0.1:26200/?=1234&tid=&pid=4567";
+ decoder = new QueryStringDecoder(rawUriStr);
+ assertThat(decoder.getQueries(), is("=1234&tid=&pid=4567"));
+ decoder.getParameters();
+ }
+}