You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/02/17 03:51:44 UTC
[1/2] TAJO-598: Refactoring Tajo RPC. (jinho)
Repository: incubator-tajo
Updated Branches:
refs/heads/master cd7bbae0d -> e2a7dffdb
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 8a0b63a..c31e9cd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -48,10 +48,12 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.ApplicationIdUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.io.File;
import java.io.IOException;
@@ -69,16 +71,16 @@ public class Task {
private final TajoConf systemConf;
private final QueryContext queryContext;
private final FileSystem localFS;
- private final TaskRunner.TaskRunnerContext taskRunnerContext;
+ private TaskRunner.TaskRunnerContext taskRunnerContext;
private final QueryMasterProtocolService.Interface masterProxy;
private final LocalDirAllocator lDirAllocator;
private final QueryUnitAttemptId taskId;
private final Path taskDir;
private final QueryUnitRequest request;
- private final TaskAttemptContext context;
+ private TaskAttemptContext context;
private List<Fetcher> fetcherRunners;
- private final LogicalNode plan;
+ private LogicalNode plan;
private final Map<String, TableDesc> descs = Maps.newHashMap();
private PhysicalExec executor;
private boolean interQuery;
@@ -107,6 +109,7 @@ public class Task {
private ShuffleType shuffleType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
+ private ClientSocketChannelFactory channelFactory = null;
static final String OUTPUT_FILE_PREFIX="part-";
static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -281,12 +284,14 @@ public class Task {
context.stop();
context.setState(TaskAttemptState.TA_KILLED);
setProgressFlag();
+ releaseChannelFactory();
}
public void abort() {
aborted = true;
context.stop();
context.setState(TaskAttemptState.TA_FAILED);
+ releaseChannelFactory();
}
public void cleanUp() {
@@ -294,7 +299,6 @@ public class Task {
if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
try {
- // context.getWorkDir() 지우기
localFS.delete(context.getWorkDir(), true);
synchronized (taskRunnerContext.getTasks()) {
taskRunnerContext.getTasks().remove(this.getId());
@@ -348,6 +352,7 @@ public class Task {
FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
context.updateAssignedFragments(inputTable, frags);
}
+ releaseChannelFactory();
}
public void run() {
@@ -371,6 +376,7 @@ public class Task {
++progress;
}
this.executor.close();
+ this.executor = null;
}
} catch (Exception e) {
// errorMessage will be sent to master.
@@ -435,6 +441,13 @@ public class Task {
public void cleanupTask() {
taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
taskRunnerContext.getTasks().remove(getId());
+ taskRunnerContext = null;
+
+ fetcherRunners.clear();
+ executor = null;
+ plan = null;
+ context = null;
+ releaseChannelFactory();
}
public TaskHistory getTaskHistory() {
@@ -515,7 +528,7 @@ public class Task {
return tablets;
}
- private class FetchRunner implements Runnable {
+ private static class FetchRunner implements Runnable {
private final TaskAttemptContext ctx;
private final Fetcher fetcher;
@@ -538,7 +551,7 @@ public class Task {
} catch (InterruptedException e) {
LOG.error(e);
}
- LOG.info("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+ LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
}
try {
File fetched = fetcher.get();
@@ -560,10 +573,24 @@ public class Task {
}
}
+ private void releaseChannelFactory(){
+ if(channelFactory != null) {
+ channelFactory.shutdown();
+ channelFactory.releaseExternalResources();
+ channelFactory = null;
+ }
+ }
+
private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
List<Fetch> fetches) throws IOException {
if (fetches.size() > 0) {
+
+ releaseChannelFactory();
+
+
+ int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+ channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
Path inputDir = lDirAllocator.
getLocalPathToRead(
getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
@@ -578,7 +605,7 @@ public class Task {
storeDir.mkdirs();
}
storeFile = new File(storeDir, "in_" + i);
- Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile);
+ Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
runnerList.add(fetcher);
i++;
}
@@ -589,74 +616,74 @@ public class Task {
}
}
- protected class Reporter implements Runnable {
+ protected class Reporter {
private QueryMasterProtocolService.Interface masterStub;
private Thread pingThread;
- private Object lock = new Object();
+ private AtomicBoolean stop = new AtomicBoolean(false);
private static final int PROGRESS_INTERVAL = 3000;
public Reporter(QueryMasterProtocolService.Interface masterStub) {
this.masterStub = masterStub;
}
- @Override
- public void run() {
- final int MAX_RETRIES = 3;
- int remainingRetries = MAX_RETRIES;
+ Runnable createReporterThread() {
- while (!stopped) {
- try {
- synchronized (lock) {
- if (stopped) {
- break;
- }
- lock.wait(PROGRESS_INTERVAL);
- }
- if (stopped) {
- break;
- }
- resetProgressFlag();
-
- if (getProgressFlag()) {
- resetProgressFlag();
- masterStub.statusUpdate(null, getReport(), NullCallback.get());
- } else {
- masterStub.ping(null, taskId.getProto(), NullCallback.get());
- }
-
- } catch (Throwable t) {
+ return new Runnable() {
+ final int MAX_RETRIES = 3;
+ int remainingRetries = MAX_RETRIES;
+ @Override
+ public void run() {
+ while (!stop.get() && !stopped) {
+ try {
- LOG.info("Communication exception: " + StringUtils
- .stringifyException(t));
- remainingRetries -=1;
- if (remainingRetries == 0) {
- ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
- LOG.warn("Last retry, killing ");
- System.exit(65);
+ resetProgressFlag();
+
+ if (getProgressFlag()) {
+ resetProgressFlag();
+ masterStub.statusUpdate(null, getReport(), NullCallback.get());
+ } else {
+ masterStub.ping(null, taskId.getProto(), NullCallback.get());
+ }
+ synchronized (pingThread) {
+ pingThread.wait(PROGRESS_INTERVAL);
+ }
+
+ } catch (Throwable t) {
+
+ LOG.info("Communication exception: " + StringUtils
+ .stringifyException(t));
+ remainingRetries -=1;
+ if (remainingRetries == 0) {
+ ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+ LOG.warn("Last retry, killing ");
+ System.exit(65);
+ }
+ }
}
}
- }
+ };
}
public void startCommunicationThread() {
if (pingThread == null) {
- pingThread = new Thread(this, "communication thread");
- pingThread.setDaemon(true);
+ pingThread = new Thread(createReporterThread());
+ pingThread.setName("communication thread");
pingThread.start();
}
}
public void stopCommunicationThread() throws InterruptedException {
+ if(stop.getAndSet(true)){
+ return;
+ }
+
if (pingThread != null) {
// Intent of the lock is to not send an interupt in the middle of an
// umbilical.ping or umbilical.statusUpdate
- synchronized(lock) {
+ synchronized(pingThread) {
//Interrupt if sleeping. Otherwise wait for the RPC call to return.
- lock.notify();
+ pingThread.notifyAll();
}
-
- pingThread.interrupt();
- pingThread.join();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index dab18b5..7b49c15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -78,12 +78,8 @@ public class TaskRunner extends AbstractService {
private TajoQueryEngine queryEngine;
- // TODO - this should be configurable
- private final int coreNum = 4;
-
// for Fetcher
- private final ExecutorService fetchLauncher =
- Executors.newFixedThreadPool(coreNum * 4);
+ private final ExecutorService fetchLauncher;
// It keeps all of the query unit attempts while a TaskRunner is running.
private final Map<QueryUnitAttemptId, Task> tasks =
new ConcurrentHashMap<QueryUnitAttemptId, Task>();
@@ -118,7 +114,8 @@ public class TaskRunner extends AbstractService {
this.taskRunnerManager = taskRunnerManager;
this.connPool = RpcConnectionPool.getPool(conf);
-
+ this.fetchLauncher = Executors.newFixedThreadPool(
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
try {
final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
@@ -220,6 +217,9 @@ public class TaskRunner extends AbstractService {
}
}
+ tasks.clear();
+ fetchLauncher.shutdown();
+ this.queryEngine = null;
// if(client != null) {
// client.close();
// client = null;
@@ -352,7 +352,7 @@ public class TaskRunner extends AbstractService {
}
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
- LOG.warn("Timeout GetTask:" + getId() + ", but retry", te);
+ LOG.info("Retry assigning task:" + getId());
continue;
}
@@ -400,8 +400,6 @@ public class TaskRunner extends AbstractService {
}
}
} catch (Throwable t) {
- connPool.closeConnection(qmClient);
- qmClient = null;
t.printStackTrace();
} finally {
connPool.releaseConnection(qmClient);
@@ -410,8 +408,6 @@ public class TaskRunner extends AbstractService {
}
});
taskLauncher.start();
- taskLauncher.join();
-
} catch (Throwable t) {
LOG.fatal("Unhandled exception. Starting shutdown.", t);
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 0489e37..9d7e438 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -18,102 +18,20 @@
package org.apache.tajo.engine.eval;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.function.GeneralFunction;
import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.tajo.common.TajoDataTypes.Type.*;
import static org.junit.Assert.*;
public class TestEvalTree extends ExprTestBase{
- private static TajoTestingCluster util;
- private static CatalogService cat;
- private static Tuple [] tuples = new Tuple[3];
-
- @BeforeClass
- public static void setUp() throws Exception {
- util = new TajoTestingCluster();
- util.startCatalogCluster();
- cat = util.getMiniCatalogCluster().getCatalog();
- for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- cat.createFunction(funcDesc);
- }
-
- Schema schema = new Schema();
- schema.addColumn("name", TEXT);
- schema.addColumn("score", INT4);
- schema.addColumn("age", INT4);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
- TableDesc desc = new TableDesc("people", schema, meta, CommonTestingUtil.getTestDir());
- cat.addTable(desc);
-
- FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
- CatalogUtil.newSimpleDataType(INT4),
- CatalogUtil.newSimpleDataTypeArray(INT4, INT4));
- cat.createFunction(funcMeta);
-
- tuples[0] = new VTuple(3);
- tuples[0].put(new Datum[] {
- DatumFactory.createText("aabc"),
- DatumFactory.createInt4(100),
- DatumFactory.createInt4(10)});
- tuples[1] = new VTuple(3);
- tuples[1].put(new Datum[] {
- DatumFactory.createText("aaba"),
- DatumFactory.createInt4(200),
- DatumFactory.createInt4(20)});
- tuples[2] = new VTuple(3);
- tuples[2].put(new Datum[] {
- DatumFactory.createText("kabc"),
- DatumFactory.createInt4(300),
- DatumFactory.createInt4(30)});
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- util.shutdownCatalogCluster();
- }
-
- public static class TestSum extends GeneralFunction {
- private Integer x;
- private Integer y;
-
- public TestSum() {
- super(new Column[] { new Column("arg1", INT4),
- new Column("arg2", INT4) });
- }
-
- @Override
- public Datum eval(Tuple params) {
- x = params.get(0).asInt4();
- y = params.get(1).asInt4();
- return DatumFactory.createInt4(x + y);
- }
- }
-
- static String[] QUERIES = {
- "select name, score, age from people where score > 30", // 0
- "select name, score, age from people where score * age", // 1
- "select name, score, age from people where test_sum(score * age, 50)", // 2
- "select 2+3", // 3
- "select sum(score) from people", // 4
- "select name from people where NOT (20 > 30)", // 5
- };
-
@Test
public void testTupleEval() throws CloneNotSupportedException {
ConstEval e1 = new ConstEval(DatumFactory.createInt4(1));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index c25afc8..d756242 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -27,8 +27,9 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
+import org.apache.tajo.engine.function.GeneralFunction;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
@@ -38,6 +39,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -48,6 +50,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
+import static org.apache.tajo.common.TajoDataTypes.Type.INT4;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -59,7 +62,22 @@ public class TestEvalTreeUtil {
static EvalNode expr3;
static SQLAnalyzer analyzer;
static LogicalPlanner planner;
+ public static class TestSum extends GeneralFunction {
+ private Integer x;
+ private Integer y;
+ public TestSum() {
+ super(new Column[] { new Column("arg1", INT4),
+ new Column("arg2", INT4) });
+ }
+
+ @Override
+ public Datum eval(Tuple params) {
+ x = params.get(0).asInt4();
+ y = params.get(1).asInt4();
+ return DatumFactory.createInt4(x + y);
+ }
+ }
@BeforeClass
public static void setUp() throws Exception {
@@ -88,9 +106,15 @@ public class TestEvalTreeUtil {
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- expr1 = getRootSelection(TestEvalTree.QUERIES[0]);
- expr2 = getRootSelection(TestEvalTree.QUERIES[1]);
- expr3 = getRootSelection(TestEvalTree.QUERIES[2]);
+ String[] QUERIES = {
+ "select name, score, age from people where score > 30", // 0
+ "select name, score, age from people where score * age", // 1
+ "select name, score, age from people where test_sum(score * age, 50)", // 2
+ };
+
+ expr1 = getRootSelection(QUERIES[0]);
+ expr2 = getRootSelection(QUERIES[1]);
+ expr3 = getRootSelection(QUERIES[2]);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index 3cf69a7..2d3124d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -36,6 +36,7 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -87,6 +88,11 @@ public class TestGlobalPlanner {
StorageManagerFactory.getStorageManager(util.getConfiguration()));
}
+ @AfterClass
+ public static void tearDown() {
+ util.shutdownCatalogCluster();
+ }
+
private MasterPlan buildPlan(String sql) throws PlanningException, IOException {
Expr expr = sqlAnalyzer.parse(sql);
LogicalPlan plan = planner.createPlan(expr);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 0181889..aa4a20c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -22,13 +22,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.dataserver.HttpDataServer;
import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.junit.Before;
+import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
@@ -70,7 +72,8 @@ public class TestFetcher {
InetSocketAddress addr = server.getBindAddress();
URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
- Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"));
+ ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+ Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
fetcher.get();
server.stop();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index da34ac7..a427635 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -19,7 +19,6 @@
package org.apache.tajo.pullserver;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +44,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
@@ -54,7 +54,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -73,8 +72,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -116,7 +113,7 @@ public class TajoPullServerService extends AbstractService {
private static final Map<String,String> userRsrc =
new ConcurrentHashMap<String,String>();
- private static String userName;
+ private String userName;
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"tajo.pullserver.ssl.file.buffer.size";
@@ -194,7 +191,7 @@ public class TajoPullServerService extends AbstractService {
}
@Override
- public synchronized void init(Configuration conf) {
+ public void init(Configuration conf) {
try {
manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
@@ -202,16 +199,10 @@ public class TajoPullServerService extends AbstractService {
readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
DEFAULT_SHUFFLE_READAHEAD_BYTES);
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Worker #%d")
- .build();
+ int workerNum = conf.getInt("tajo.shuffle.rpc.server.io-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2);
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
+ selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
localFS = new LocalFileSystem();
super.init(new Configuration(conf));
@@ -228,7 +219,6 @@ public class TajoPullServerService extends AbstractService {
public synchronized void start() {
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.setOption("tcpNoDelay", true);
try {
pipelineFact = new HttpPipelineFactory(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 01e1110..c84d6b6 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -26,6 +26,7 @@ import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
public class AsyncRpcClient extends NettyClientBase {
- private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+ private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
private final ChannelUpstreamHandler handler;
private final ChannelPipelineFactory pipeFactory;
@@ -56,7 +57,12 @@ public class AsyncRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
AsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr)
+ final InetSocketAddress addr) throws Exception {
+ this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
+ }
+
+ AsyncRpcClient(final Class<?> protocol,
+ final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
@@ -68,8 +74,8 @@ public class AsyncRpcClient extends NettyClientBase {
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory);
- rpcChannel = new ProxyRpcChannel(getChannel());
+ super.init(addr, pipeFactory, factory);
+ rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, true);
}
@@ -92,12 +98,10 @@ public class AsyncRpcClient extends NettyClientBase {
}
private class ProxyRpcChannel implements RpcChannel {
- private final Channel channel;
private final ClientChannelUpstreamHandler handler;
- public ProxyRpcChannel(Channel channel) {
- this.channel = channel;
- this.handler = channel.getPipeline()
+ public ProxyRpcChannel() {
+ this.handler = getChannel().getPipeline()
.get(ClientChannelUpstreamHandler.class);
if (handler == null) {
@@ -119,7 +123,7 @@ public class AsyncRpcClient extends NettyClientBase {
handler.registerCallback(nextSeqId,
new ResponseCallback(controller, responseType, done));
- channel.write(rpcRequest);
+ getChannel().write(rpcRequest);
}
private Message buildRequest(int seqId,
@@ -214,7 +218,6 @@ public class AsyncRpcClient extends NettyClientBase {
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
- e.getChannel().close();
for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
ResponseCallback callback = callbackEntry.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 4dc1d05..b7e3cb6 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -40,7 +40,8 @@ public class AsyncRpcServer extends NettyServerBase {
public AsyncRpcServer(final Class<?> protocol,
final Object instance,
- final InetSocketAddress bindAddress)
+ final InetSocketAddress bindAddress,
+ final int workerNum)
throws Exception {
super(protocol.getSimpleName(), bindAddress);
@@ -54,12 +55,23 @@ public class AsyncRpcServer extends NettyServerBase {
ServerHandler handler = new ServerHandler();
this.pipeline = new ProtoPipelineFactory(handler,
RpcRequest.getDefaultInstance());
- super.init(this.pipeline);
+ super.init(this.pipeline, workerNum);
}
private class ServerHandler extends SimpleChannelUpstreamHandler {
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ throws Exception {
+
+ accepted.add(evt.getChannel());
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+ }
+ super.channelOpen(ctx, evt);
+ }
+
+ @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index a0963a7..604ed52 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -29,6 +29,7 @@ import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -59,7 +60,12 @@ public class BlockingRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
BlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr)
+ final InetSocketAddress addr) throws Exception {
+ this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
+ }
+
+ BlockingRpcClient(final Class<?> protocol,
+ final InetSocketAddress addr, ClientSocketChannelFactory factory)
throws Exception {
this.protocol = protocol;
@@ -72,8 +78,8 @@ public class BlockingRpcClient extends NettyClientBase {
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory);
- rpcChannel = new ProxyRpcChannel(getChannel());
+ super.init(addr, pipeFactory, factory);
+ rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, false);
}
@@ -97,12 +103,12 @@ public class BlockingRpcClient extends NettyClientBase {
}
private class ProxyRpcChannel implements BlockingRpcChannel {
- private final Channel channel;
+
private final ClientChannelUpstreamHandler handler;
- public ProxyRpcChannel(Channel channel) {
- this.channel = channel;
- this.handler = channel.getPipeline().
+ public ProxyRpcChannel() {
+
+ this.handler = getChannel().getPipeline().
get(ClientChannelUpstreamHandler.class);
if (handler == null) {
@@ -124,7 +130,7 @@ public class BlockingRpcClient extends NettyClientBase {
ProtoCallFuture callFuture =
new ProtoCallFuture(controller, responsePrototype);
requests.put(nextSeqId, callFuture);
- channel.write(rpcRequest);
+ getChannel().write(rpcRequest);
try {
return callFuture.get();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 3649c5e..067d824 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -38,7 +38,8 @@ public class BlockingRpcServer extends NettyServerBase {
public BlockingRpcServer(final Class<?> protocol,
final Object instance,
- final InetSocketAddress bindAddress)
+ final InetSocketAddress bindAddress,
+ final int workerNum)
throws Exception {
super(protocol.getSimpleName(), bindAddress);
@@ -55,12 +56,23 @@ public class BlockingRpcServer extends NettyServerBase {
this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
RpcRequest.getDefaultInstance());
- super.init(this.pipeline);
+ super.init(this.pipeline, workerNum);
}
private class ServerHandler extends SimpleChannelUpstreamHandler {
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ throws Exception {
+
+ accepted.add(evt.getChannel());
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+ }
+ super.channelOpen(ctx, evt);
+ }
+
+ @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
final RpcRequest request = (RpcRequest) e.getMessage();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 79c2674..8373c37 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,77 +20,56 @@ package org.apache.tajo.rpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
public abstract class NettyClientBase implements Closeable {
private static Log LOG = LogFactory.getLog(NettyClientBase.class);
- //netty default value
- protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
- protected static int nettyWorkerCount;
-
- /**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- */
- private static final ClientSocketChannelFactory factory;
-
protected ClientBootstrap bootstrap;
private ChannelFuture channelFuture;
- static {
- TajoConf conf = new TajoConf();
-
- nettyWorkerCount = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_SOCKET_IO_THREADS);
- if (nettyWorkerCount <= 0) {
- nettyWorkerCount = DEFAULT_IO_THREADS;
- }
-
- factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool(),
- nettyWorkerCount);
- }
-
public NettyClientBase() {
}
public abstract <T> T getStub();
public abstract RpcConnectionPool.RpcConnectionKey getKey();
- public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory) throws IOException {
+ public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
+ throws IOException {
try {
this.bootstrap = new ClientBootstrap(factory);
this.bootstrap.setPipelineFactory(pipeFactory);
// TODO - should be configurable
this.bootstrap.setOption("connectTimeoutMillis", 10000);
this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
- this.bootstrap.setOption("receiveBufferSize", 1048576*2);
+ this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
this.bootstrap.setOption("tcpNoDelay", true);
this.bootstrap.setOption("keepAlive", true);
- this.channelFuture = bootstrap.connect(addr);
- this.channelFuture.awaitUninterruptibly();
- if (!channelFuture.isSuccess()) {
- channelFuture.getCause().printStackTrace();
- throw new RuntimeException(channelFuture.getCause());
- }
+ connect(addr);
} catch (Throwable t) {
close();
throw new IOException(t.getCause());
}
}
+ public void connect(InetSocketAddress addr) {
+ this.channelFuture = bootstrap.connect(addr);
+ this.channelFuture.awaitUninterruptibly();
+ if (!channelFuture.isSuccess()) {
+ channelFuture.getCause().printStackTrace();
+ throw new RuntimeException(channelFuture.getCause());
+ }
+ }
+
public boolean isConnected() {
return getChannel().isConnected();
}
@@ -105,8 +84,12 @@ public abstract class NettyClientBase implements Closeable {
@Override
public void close() {
- if(this.channelFuture != null) {
- this.channelFuture.getChannel().close();
+ if(this.channelFuture != null && getChannel().isOpen()) {
+ try {
+ getChannel().close().awaitUninterruptibly();
+ } catch (Throwable ce) {
+ LOG.warn(ce);
+ }
}
if(this.bootstrap != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 154fb9b..371f879 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -25,14 +25,15 @@ import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Random;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyServerBase {
@@ -43,10 +44,10 @@ public class NettyServerBase {
protected String serviceName;
protected InetSocketAddress serverAddr;
protected InetSocketAddress bindAddress;
- protected ChannelFactory factory;
protected ChannelPipelineFactory pipelineFactory;
protected ServerBootstrap bootstrap;
protected Channel channel;
+ protected ChannelGroup accepted = new DefaultChannelGroup();
private InetSocketAddress initIsa;
@@ -63,21 +64,19 @@ public class NettyServerBase {
this.serviceName = name;
}
- public void init(ChannelPipelineFactory pipeline) {
- this.factory =
- new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
+ public void init(ChannelPipelineFactory pipeline, int workerNum) {
+ ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
pipelineFactory = pipeline;
bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(pipelineFactory);
// TODO - should be configurable
bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", false);
+ bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("child.connectTimeoutMillis", 10000);
bootstrap.setOption("child.connectResponseTimeoutMillis", 10000);
- bootstrap.setOption("child.receiveBufferSize", 1048576 * 2);
+ bootstrap.setOption("child.receiveBufferSize", 1048576 * 10);
}
public InetSocketAddress getListenAddress() {
@@ -114,9 +113,16 @@ public class NettyServerBase {
if(channel != null) {
channel.close().awaitUninterruptibly();
}
- if(factory != null) {
- factory.releaseExternalResources();
+
+ try {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
}
+ if(bootstrap != null) {
+ bootstrap.releaseExternalResources();
+ }
+
LOG.info("Rpc (" + serviceName + ") listened on "
+ NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
new file mode 100644
index 0000000..adafd5c
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.*;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class RpcChannelFactory {
+ private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+ private static ClientSocketChannelFactory factory;
+ private static AtomicInteger clientCount = new AtomicInteger(0);
+ private static AtomicInteger serverCount = new AtomicInteger(0);
+
+ private RpcChannelFactory(){
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ */
+ public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){
+ //shared woker and boss pool
+ if(factory == null){
+ TajoConf conf = new TajoConf();
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM);
+ factory = createClientChannelFactory("Internal-Client", workerNum);
+ }
+ return factory;
+ }
+
+ // Client must release the external resources
+ public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
+ name = name + "-" + clientCount.incrementAndGet();
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
+ }
+
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
+ ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
+
+ NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
+ new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
+ NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
+ ThreadNameDeterminer.CURRENT);
+
+ return new NioClientSocketChannelFactory(bossPool, workerPool);
+ }
+
+ // Client must release the external resources
+ public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) {
+ name = name + "-" + serverCount.incrementAndGet();
+ if(LOG.isInfoEnabled()){
+ LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
+ }
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
+ ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
+
+ NioServerBossPool bossPool =
+ new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT);
+ NioWorkerPool workerPool =
+ new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT);
+
+ return new NioServerSocketChannelFactory(bossPool, workerPool);
+ }
+
+ public static synchronized void shutdown(){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown Shared RPC Pool");
+ }
+ factory.releaseExternalResources();
+ factory = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index dee42d1..777281f 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -18,55 +18,95 @@
package org.apache.tajo.rpc;
+import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.TUtil;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.net.InetSocketAddress;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
public class RpcConnectionPool {
private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
- private Map<RpcConnectionKey, NettyClientBase> connections = TUtil.newConcurrentHashMap();
+ private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections =
+ new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>();
+ private ChannelGroup accepted = new DefaultChannelGroup();
private static RpcConnectionPool instance;
+ private final ClientSocketChannelFactory channelFactory;
+ private final TajoConf conf;
- private TajoConf conf;
-
- private RpcConnectionPool(TajoConf conf) {
+ private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
this.conf = conf;
+ this.channelFactory = channelFactory;
}
public synchronized static RpcConnectionPool getPool(TajoConf conf) {
if(instance == null) {
- instance = new RpcConnectionPool(conf);
+ instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
}
-
return instance;
}
+ public synchronized static RpcConnectionPool newPool(TajoConf conf, String poolName, int workerNum) {
+ return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
+ }
+
private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws Exception {
+ NettyClientBase client;
if(rpcConnectionKey.asyncMode) {
- return new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr);
+ client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
} else {
- return new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr);
+ client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
}
+ accepted.add(client.getChannel());
+ return client;
}
public NettyClientBase getConnection(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode) throws Exception {
+ Class protocolClass, boolean asyncMode) throws Exception {
RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
- synchronized(connections) {
- if(!connections.containsKey(key)) {
- connections.put(key, makeConnection(key));
+ NettyClientBase client = connections.get(key);
+
+ if (client == null) {
+ client = makeConnection(key);
+ boolean added = connections.putIfAbsent(key, client) == null;
+
+ if (!added) {
+ client.close();
+ client = connections.get(key);
}
- return connections.get(key);
}
+
+ if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) {
+ LOG.warn("Try to reconnect : " + addr);
+ client.connect(addr);
+ }
+ return client;
}
public void releaseConnection(NettyClientBase client) {
+ if (client == null) return;
+
+ try {
+ if (!client.getChannel().isOpen()) {
+ connections.remove(client.getKey());
+ client.close();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size());
+
+ }
+ } catch (Exception e) {
+ LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
+ }
}
public void closeConnection(NettyClientBase client) {
@@ -76,12 +116,12 @@ public class RpcConnectionPool {
try {
if(LOG.isDebugEnabled()) {
- LOG.debug("CloseConnection [" + client.getKey() + "]");
- }
- synchronized(connections) {
- connections.remove(client.getKey());
+ LOG.debug("Close connection [" + client.getKey() + "]");
}
+
+ connections.remove(client.getKey());
client.close();
+
} catch (Exception e) {
LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
}
@@ -99,7 +139,20 @@ public class RpcConnectionPool {
LOG.error("close client pool error", e);
}
}
- connections.clear();
+ }
+
+ connections.clear();
+ try {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOG.error(t);
+ }
+ }
+
+ public synchronized void shutdown(){
+ close();
+ if(channelFactory != null){
+ channelFactory.releaseExternalResources();
}
}
@@ -117,7 +170,7 @@ public class RpcConnectionPool {
@Override
public String toString() {
- return protocolClass + "," + addr + "," + asyncMode;
+ return "["+ protocolClass + "] " + addr + "," + asyncMode;
}
@Override
@@ -131,7 +184,7 @@ public class RpcConnectionPool {
@Override
public int hashCode() {
- return toString().hashCode();
+ return Objects.hashCode(addr, asyncMode);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 83914e5..97bc28d 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
public class TestAsyncRpc {
private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
@@ -56,7 +55,7 @@ public class TestAsyncRpc {
public void setUp() throws Exception {
service = new DummyProtocolAsyncImpl();
server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0));
+ service, new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new AsyncRpcClient(DummyProtocol.class,
NetUtils.getConnectAddress(server.getListenAddress()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 6cd5f25..ba2b919 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -48,7 +48,7 @@ public class TestBlockingRpc {
public void setUp() throws Exception {
service = new DummyProtocolBlockingImpl();
server = new BlockingRpcServer(DummyProtocol.class, service,
- new InetSocketAddress("127.0.0.1", 0));
+ new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new BlockingRpcClient(DummyProtocol.class,
NetUtils.getConnectAddress(server.getListenAddress()));
[2/2] git commit: TAJO-598: Refactoring Tajo RPC. (jinho)
Posted by jh...@apache.org.
TAJO-598: Refactoring Tajo RPC. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e2a7dffd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e2a7dffd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e2a7dffd
Branch: refs/heads/master
Commit: e2a7dffdb652c77dd35a917134cef15b3d54d274
Parents: cd7bbae
Author: jinossy <ji...@gmail.com>
Authored: Mon Feb 17 11:50:38 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Feb 17 11:50:38 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tajo/catalog/CatalogServer.java | 7 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 12 +-
.../java/org/apache/tajo/client/TajoClient.java | 17 +--
.../java/org/apache/tajo/conf/TajoConf.java | 27 +++-
.../org/apache/tajo/benchmark/SimpleQuery.java | 2 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 16 +--
.../planner/physical/AggregationExec.java | 1 +
.../engine/planner/physical/BNLJoinExec.java | 25 +++-
.../planner/physical/BSTIndexScanExec.java | 12 +-
.../planner/physical/BinaryPhysicalExec.java | 8 +-
.../planner/physical/ExternalSortExec.java | 31 +++--
.../planner/physical/HashAggregateExec.java | 2 +
.../planner/physical/HashFullOuterJoinExec.java | 8 ++
.../engine/planner/physical/HashJoinExec.java | 11 +-
.../planner/physical/HashLeftOuterJoinExec.java | 10 +-
.../physical/HashShuffleFileWriteExec.java | 18 ++-
.../engine/planner/physical/MemSortExec.java | 11 +-
.../physical/MergeFullOuterJoinExec.java | 18 ++-
.../engine/planner/physical/MergeJoinExec.java | 20 ++-
.../engine/planner/physical/ProjectionExec.java | 8 +-
.../physical/RangeShuffleFileWriteExec.java | 7 +-
.../physical/RightOuterMergeJoinExec.java | 18 ++-
.../engine/planner/physical/SeqScanExec.java | 9 +-
.../engine/planner/physical/StoreTableExec.java | 16 ++-
.../planner/physical/UnaryPhysicalExec.java | 1 +
.../tajo/master/DefaultTaskScheduler.java | 12 +-
.../apache/tajo/master/LazyTaskScheduler.java | 4 +-
.../apache/tajo/master/TajoContainerProxy.java | 5 -
.../java/org/apache/tajo/master/TajoMaster.java | 6 +
.../tajo/master/TajoMasterClientService.java | 5 +-
.../apache/tajo/master/TajoMasterService.java | 3 +-
.../apache/tajo/master/YarnContainerProxy.java | 10 +-
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 2 +-
.../master/querymaster/QueryInProgress.java | 16 +--
.../tajo/master/querymaster/QueryMaster.java | 22 +---
.../querymaster/QueryMasterManagerService.java | 8 +-
.../master/querymaster/QueryMasterTask.java | 8 +-
.../master/rm/TajoWorkerResourceManager.java | 4 +-
.../tajo/master/rm/YarnTajoResourceManager.java | 2 +-
.../reporter/MetricsFileScheduledReporter.java | 1 +
.../reporter/TajoMetricsScheduledReporter.java | 2 +-
.../java/org/apache/tajo/worker/Fetcher.java | 18 +--
.../tajo/worker/TajoResourceAllocator.java | 15 ++-
.../java/org/apache/tajo/worker/TajoWorker.java | 24 ++--
.../tajo/worker/TajoWorkerClientService.java | 10 +-
.../tajo/worker/TajoWorkerManagerService.java | 5 +-
.../main/java/org/apache/tajo/worker/Task.java | 125 +++++++++++--------
.../java/org/apache/tajo/worker/TaskRunner.java | 18 ++-
.../apache/tajo/engine/eval/TestEvalTree.java | 86 +------------
.../tajo/engine/eval/TestEvalTreeUtil.java | 32 ++++-
.../apache/tajo/master/TestGlobalPlanner.java | 6 +
.../org/apache/tajo/worker/TestFetcher.java | 9 +-
.../tajo/pullserver/TajoPullServerService.java | 22 +---
.../org/apache/tajo/rpc/AsyncRpcClient.java | 23 ++--
.../org/apache/tajo/rpc/AsyncRpcServer.java | 16 ++-
.../org/apache/tajo/rpc/BlockingRpcClient.java | 22 ++--
.../org/apache/tajo/rpc/BlockingRpcServer.java | 16 ++-
.../org/apache/tajo/rpc/NettyClientBase.java | 55 +++-----
.../org/apache/tajo/rpc/NettyServerBase.java | 28 +++--
.../org/apache/tajo/rpc/RpcChannelFactory.java | 102 +++++++++++++++
.../org/apache/tajo/rpc/RpcConnectionPool.java | 97 ++++++++++----
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 3 +-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 2 +-
64 files changed, 735 insertions(+), 426 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ef8284..9e21181 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-598: Refactoring Tajo RPC. (jinho)
+
TAJO-592: HCatalogStore should supports RCFile and default hive field delimiter. (jaehwa)
TAJO-548: Investigate frequent young gc. (Min Zhou via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 62d6e27..cf13a9d 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -149,10 +149,9 @@ public class CatalogServer extends AbstractService {
public void start() {
String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
+ int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM);
try {
- this.rpcServer = new BlockingRpcServer(
- CatalogProtocol.class,
- handler, initIsa);
+ this.rpcServer = new BlockingRpcServer(CatalogProtocol.class, handler, initIsa, workerNum);
this.rpcServer.start();
this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
@@ -175,7 +174,7 @@ public class CatalogServer extends AbstractService {
try {
store.close();
} catch (IOException ioe) {
- LOG.error(ioe);
+ LOG.error(ioe.getMessage(), ioe);
}
super.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index be4be3f..57a7294 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -202,8 +202,8 @@ public class TajoCli {
continue;
} else if (line.charAt(0) == '\\') { // command mode
- executeCommand(line);
((PersistentHistory)reader.getHistory()).flush();
+ executeCommand(line);
} else if (line.endsWith(";") && !line.endsWith("\\;")) {
@@ -330,11 +330,14 @@ public class TajoCli {
try {
QueryStatus status;
+ int initRetries = 0;
+ int progressRetries = 0;
while (true) {
- // TODO - configurable
- Thread.sleep(1000);
+ // TODO - configurabl
status = client.getQueryStatus(queryId);
if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) {
+ Thread.sleep(Math.min(20 * initRetries, 1000));
+ initRetries++;
continue;
}
@@ -347,6 +350,9 @@ public class TajoCli {
if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
break;
+ } else {
+ Thread.sleep(Math.min(200 * progressRetries, 1000));
+ progressRetries += 2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 2846398..3aeb40e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -73,8 +73,9 @@ public class TajoClient {
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
this.tajoMasterAddr = addr;
-
- connPool = RpcConnectionPool.getPool(conf);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
+ //Don't share connection pool per client
+ connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
}
public TajoClient(InetSocketAddress addr) throws IOException {
@@ -87,7 +88,7 @@ public class TajoClient {
public void close() {
if(connPool != null) {
- connPool.close();
+ connPool.shutdown();
}
queryMasterMap.clear();
}
@@ -193,7 +194,7 @@ public class TajoClient {
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
- connPool.closeConnection(qmClient);
+ connPool.releaseConnection(qmClient);
}
} else {
NettyClientBase tmClient = null;
@@ -217,13 +218,13 @@ public class TajoClient {
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
- connPool.closeConnection(qmClient);
+ connPool.releaseConnection(qmClient);
}
}
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
- connPool.closeConnection(tmClient);
+ connPool.releaseConnection(tmClient);
}
}
return new QueryStatus(res);
@@ -307,7 +308,7 @@ public class TajoClient {
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
- connPool.closeConnection(client);
+ connPool.releaseConnection(client);
}
}
@@ -490,7 +491,7 @@ public class TajoClient {
LOG.debug("Error when checking for application status", e);
return false;
} finally {
- connPool.closeConnection(tmClient);
+ connPool.releaseConnection(tmClient);
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 7c82d72..25caf13 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -137,6 +137,7 @@ public class TajoConf extends Configuration {
PULLSERVER_PORT("tajo.pullserver.port", 0),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
+ SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2),
//////////////////////////////////
// Storage Configuration
@@ -184,7 +185,31 @@ public class TajoConf extends Configuration {
// RPC
//////////////////////////////////
RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
- RPC_CLIENT_SOCKET_IO_THREADS("tajo.rpc.client.socket-io-threads", 0),
+
+ //Internal RPC Client
+ INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+
+ //Internal RPC Server
+ MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+ QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.querymaster.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+ WORKER_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+ CATALOG_RPC_SERVER_WORKER_THREAD_NUM("tajo.catalog.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+ SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM("tajo.shuffle.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2),
+
+ // Client RPC
+ RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4),
+
+ //Client service RPC Server
+ MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 1),
+ WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 1),
//////////////////////////////////
// The Below is reserved
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
index fb8fe5f..af6af8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
@@ -21,7 +21,7 @@ package org.apache.tajo.benchmark;
import java.io.IOException;
public class SimpleQuery extends TPCH {
- private final String BENCHMARK_DIR = "benchmark/simple";
+ private static final String BENCHMARK_DIR = "benchmark/simple";
public void loadQueries() throws IOException {
loadQueries(BENCHMARK_DIR);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
index d11941d..2e12b1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -38,14 +38,14 @@ public class TPCH extends BenchmarkSet {
private final Log LOG = LogFactory.getLog(TPCH.class);
private final String BENCHMARK_DIR = "benchmark/tpch";
- public static String LINEITEM = "lineitem";
- public static String CUSTOMER = "customer";
- public static String NATION = "nation";
- public static String PART = "part";
- public static String REGION = "region";
- public static String ORDERS = "orders";
- public static String PARTSUPP = "partsupp";
- public static String SUPPLIER = "supplier";
+ public static final String LINEITEM = "lineitem";
+ public static final String CUSTOMER = "customer";
+ public static final String NATION = "nation";
+ public static final String PART = "part";
+ public static final String REGION = "region";
+ public static final String ORDERS = "orders";
+ public static final String PARTSUPP = "partsupp";
+ public static final String SUPPLIER = "supplier";
public static final Map<String, Long> tableVolumes = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index eb69703..208973e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -64,5 +64,6 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
@Override
public void close() throws IOException {
super.close();
+ plan = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index b39a9f1..71581e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -34,12 +34,12 @@ import java.util.List;
public class BNLJoinExec extends BinaryPhysicalExec {
// from logical plan
- private final JoinNode plan;
+ private JoinNode plan;
private final boolean hasJoinQual;
- private final EvalNode joinQual;
+ private EvalNode joinQual;
- private final List<Tuple> leftTupleSlots;
- private final List<Tuple> rightTupleSlots;
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> rightTupleSlots;
private Iterator<Tuple> leftIterator;
private Iterator<Tuple> rightIterator;
@@ -55,7 +55,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
private final int TUPLE_SLOT_SIZE = 10000;
// projection
- private final Projector projector;
+ private Projector projector;
public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -205,4 +205,19 @@ public class BNLJoinExec extends BinaryPhysicalExec {
rightIterator = rightTupleSlots.iterator();
leftIterator = leftTupleSlots.iterator();
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ rightTupleSlots.clear();
+ leftTupleSlots.clear();
+ rightTupleSlots = null;
+ leftTupleSlots = null;
+ rightIterator = null;
+ leftIterator = null;
+ plan = null;
+ joinQual = null;
+ projector = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 2ff6fc9..753dcc8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.EvalNode;
@@ -38,7 +39,7 @@ public class BSTIndexScanExec extends PhysicalExec {
private EvalNode qual;
private BSTIndex.BSTIndexReader reader;
- private final Projector projector;
+ private Projector projector;
private Datum[] datum = null;
@@ -125,8 +126,11 @@ public class BSTIndexScanExec extends PhysicalExec {
@Override
public void close() throws IOException {
- reader.close();
- fileScanner.close();
+ IOUtils.cleanup(null, reader, fileScanner);
+ reader = null;
+ fileScanner = null;
+ scanNode = null;
+ qual = null;
+ projector = null;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index f1e3a00..fc8d25d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -18,14 +18,14 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public abstract class BinaryPhysicalExec extends PhysicalExec {
- protected final PhysicalExec leftChild;
- protected final PhysicalExec rightChild;
+ protected PhysicalExec leftChild;
+ protected PhysicalExec rightChild;
public BinaryPhysicalExec(final TaskAttemptContext context,
final Schema inSchema, final Schema outSchema,
@@ -59,5 +59,7 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
public void close() throws IOException {
leftChild.close();
rightChild.close();
+ leftChild = null;
+ rightChild = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index f0ac290..791781e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -61,7 +62,7 @@ public class ExternalSortExec extends SortExec {
/** Class logger */
private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
- private final SortNode plan;
+ private SortNode plan;
private final TableMeta meta;
/** the defaultFanout of external sort */
private final int defaultFanout;
@@ -70,9 +71,9 @@ public class ExternalSortExec extends SortExec {
/** the number of available cores */
private final int allocatedCoreNum;
/** If there are available multiple cores, it tries parallel merge. */
- private final ExecutorService executorService;
+ private ExecutorService executorService;
/** used for in-memory sort of each chunk. */
- private final List<Tuple> inMemoryTable;
+ private List<Tuple> inMemoryTable;
/** temporal dir */
private final Path sortTmpDir;
/** It enables round-robin disks allocation */
@@ -512,8 +513,8 @@ public class ExternalSortExec extends SortExec {
* Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
*/
private class PairWiseMerger implements Scanner {
- private final Scanner leftScan;
- private final Scanner rightScan;
+ private Scanner leftScan;
+ private Scanner rightScan;
private Tuple leftTuple;
private Tuple rightTuple;
@@ -564,9 +565,11 @@ public class ExternalSortExec extends SortExec {
init();
}
+ @Override
public void close() throws IOException {
- leftScan.close();
- rightScan.close();
+ IOUtils.cleanup(LOG, leftScan, rightScan);
+ leftScan = null;
+ rightScan = null;
}
@Override
@@ -602,6 +605,7 @@ public class ExternalSortExec extends SortExec {
public void close() throws IOException {
if (result != null) {
result.close();
+ result = null;
}
if (finalOutputFiles != null) {
@@ -610,7 +614,18 @@ public class ExternalSortExec extends SortExec {
}
}
- inMemoryTable.clear();
+ if(inMemoryTable != null){
+ inMemoryTable.clear();
+ inMemoryTable = null;
+ }
+
+ if(executorService != null){
+ executorService.shutdown();
+ executorService = null;
+ }
+
+ plan = null;
+ super.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index a4153dc..1f8d000 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -109,5 +109,7 @@ public class HashAggregateExec extends AggregationExec {
public void close() throws IOException {
super.close();
hashTable.clear();
+ hashTable = null;
+ iterator = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 848e362..70dd10b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -233,8 +233,16 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
shouldGetLeftTuple = true;
}
+ @Override
public void close() throws IOException {
+ super.close();
tupleSlots.clear();
+ matched.clear();
+ tupleSlots = null;
+ matched = null;
+ iterator = null;
+ plan = null;
+ joinQual = null;
}
public JoinNode getPlan() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 08b7035..51d0b4c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -179,8 +179,17 @@ public class HashJoinExec extends BinaryPhysicalExec {
shouldGetLeftTuple = true;
}
+ @Override
public void close() throws IOException {
- tupleSlots.clear();
+ super.close();
+ if (tupleSlots != null) {
+ tupleSlots.clear();
+ tupleSlots = null;
+ }
+
+ iterator = null;
+ plan = null;
+ joinQual = null;
}
public JoinNode getPlan() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 314b3d9..93383a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -59,7 +59,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
protected boolean shouldGetLeftTuple = true;
// projection
- protected final Projector projector;
+ protected Projector projector;
private int rightNumCols;
private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
@@ -192,8 +192,16 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
shouldGetLeftTuple = true;
}
+
+ @Override
public void close() throws IOException {
+ super.close();
tupleSlots.clear();
+ tupleSlots = null;
+ iterator = null;
+ plan = null;
+ joinQual = null;
+ projector = null;
}
public JoinNode getPlan() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index c09ec19..e2b926d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -46,11 +46,11 @@ import java.util.Map;
*/
public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
- private final ShuffleFileWriteNode plan;
+ private ShuffleFileWriteNode plan;
private final TableMeta meta;
- private final Partitioner partitioner;
+ private Partitioner partitioner;
private final Path storeTablePath;
- private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+ private Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
private final int numShuffleOutputs;
private final int [] shuffleKeyIds;
@@ -143,4 +143,16 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
public void rescan() throws IOException {
// nothing to do
}
+
+ @Override
+ public void close() throws IOException{
+ super.close();
+ if (appenderMap != null) {
+ appenderMap.clear();
+ appenderMap = null;
+ }
+
+ partitioner = null;
+ plan = null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 46d061a..9f4f20a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -30,7 +30,7 @@ import java.util.Iterator;
import java.util.List;
public class MemSortExec extends SortExec {
- private final SortNode plan;
+ private SortNode plan;
private List<Tuple> tupleSlots;
private boolean sorted = false;
private Iterator<Tuple> iterator;
@@ -74,6 +74,15 @@ public class MemSortExec extends SortExec {
sorted = true;
}
+ @Override
+ public void close() throws IOException {
+ super.close();
+ tupleSlots.clear();
+ tupleSlots = null;
+ iterator = null;
+ plan = null;
+ }
+
public SortNode getPlan() {
return this.plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 1d6da3f..613e072 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -48,8 +48,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
private Tuple outTuple = null;
private Tuple leftNext = null;
- private final List<Tuple> leftTupleSlots;
- private final List<Tuple> rightTupleSlots;
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> rightTupleSlots;
private JoinTupleComparator joincomparator = null;
private TupleComparator[] tupleComparator = null;
@@ -59,7 +59,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
private boolean end = false;
// projection
- private final Projector projector;
+ private Projector projector;
private int rightNumCols;
private int leftNumCols;
@@ -320,4 +320,16 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
posRightTupleSlots = -1;
posLeftTupleSlots = -1;
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ leftTupleSlots = null;
+ rightTupleSlots = null;
+ joinNode = null;
+ joinQual = null;
+ projector = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e1d377e..f72e87e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -47,8 +47,8 @@ public class MergeJoinExec extends BinaryPhysicalExec {
private Tuple outTuple = null;
private Tuple outerNext = null;
- private final List<Tuple> outerTupleSlots;
- private final List<Tuple> innerTupleSlots;
+ private List<Tuple> outerTupleSlots;
+ private List<Tuple> innerTupleSlots;
private Iterator<Tuple> outerIterator;
private Iterator<Tuple> innerIterator;
@@ -60,7 +60,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
private boolean end = false;
// projection
- private final Projector projector;
+ private Projector projector;
public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -176,4 +176,18 @@ public class MergeJoinExec extends BinaryPhysicalExec {
outerIterator = outerTupleSlots.iterator();
innerIterator = innerTupleSlots.iterator();
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ outerTupleSlots.clear();
+ innerTupleSlots.clear();
+ outerTupleSlots = null;
+ innerTupleSlots = null;
+ outerIterator = null;
+ innerIterator = null;
+ joinQual = null;
+ projector = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index ecc6dd0..e205751 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -30,7 +30,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class ProjectionExec extends UnaryPhysicalExec {
- private final Projectable plan;
+ private Projectable plan;
// for projection
private Tuple outTuple;
@@ -60,4 +60,10 @@ public class ProjectionExec extends UnaryPhysicalExec {
projector.eval(tuple, outTuple);
return outTuple;
}
+
+ @Override
+ public void close() throws IOException{
+ super.close();
+ plan = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 13573eb..698e46e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
@@ -117,12 +118,14 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
super.close();
appender.flush();
- appender.close();
+ IOUtils.cleanup(LOG, appender);
indexWriter.flush();
- indexWriter.close();
+ IOUtils.cleanup(LOG, indexWriter);
// Collect statistics data
context.setResultStats(appender.getStats());
context.addShuffleFileOutput(0, context.getTaskId().toString());
+ appender = null;
+ indexWriter = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 365faba..b494544 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -47,8 +47,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
private Tuple outTuple = null;
private Tuple nextLeft = null;
- private final List<Tuple> leftTupleSlots;
- private final List<Tuple> innerTupleSlots;
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> innerTupleSlots;
private JoinTupleComparator joinComparator = null;
private TupleComparator[] tupleComparator = null;
@@ -58,7 +58,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
private boolean end = false;
// projection
- private final Projector projector;
+ private Projector projector;
private int rightNumCols;
private int leftNumCols;
@@ -330,5 +330,17 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
posRightTupleSlots = -1;
posLeftTupleSlots = -1;
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ leftTupleSlots = null;
+ innerTupleSlots = null;
+ joinNode = null;
+ joinQual = null;
+ projector = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 31a944c..a0c0eeb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -43,7 +44,7 @@ import java.util.Set;
public class SeqScanExec extends PhysicalExec {
- private final ScanNode plan;
+ private ScanNode plan;
private Scanner scanner = null;
private EvalNode qual = null;
@@ -186,7 +187,11 @@ public class SeqScanExec extends PhysicalExec {
@Override
public void close() throws IOException {
- scanner.close();
+ IOUtils.cleanup(null, scanner);
+ scanner = null;
+ plan = null;
+ qual = null;
+ projector = null;
}
public String getTableName() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index f097d0c..786148b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -36,7 +36,7 @@ import java.io.IOException;
* This is a physical executor to store a table part into a specified storage.
*/
public class StoreTableExec extends UnaryPhysicalExec {
- private final PersistentStoreNode plan;
+ private PersistentStoreNode plan;
private Appender appender;
private Tuple tuple;
@@ -88,11 +88,15 @@ public class StoreTableExec extends UnaryPhysicalExec {
public void close() throws IOException {
super.close();
- appender.flush();
- appender.close();
+ if(appender != null){
+ appender.flush();
+ appender.close();
+ // Collect statistics data
+ context.setResultStats(appender.getStats());
+ context.addShuffleFileOutput(0, context.getTaskId().toString());
+ }
- // Collect statistics data
- context.setResultStats(appender.getStats());
- context.addShuffleFileOutput(0, context.getTaskId().toString());
+ appender = null;
+ plan = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index f31455f..ceeca06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -58,6 +58,7 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
public void close() throws IOException {
if (child != null) {
child.close();
+ child = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index b6bde94..4260c98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -48,6 +48,7 @@ import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -58,7 +59,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private SubQuery subQuery;
private Thread schedulingThread;
- private volatile boolean stopEventHandling;
+ private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
private ScheduledRequests scheduledRequests;
private TaskRequests taskRequests;
@@ -91,7 +92,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
this.schedulingThread = new Thread() {
public void run() {
- while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
schedule();
try {
synchronized (schedulingThread){
@@ -127,7 +128,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void stop() {
- stopEventHandling = true;
+ if(stopEventHandling.getAndSet(true)){
+ return;
+ }
if (schedulingThread != null) {
synchronized (schedulingThread) {
@@ -224,6 +227,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void handleTaskRequestEvent(TaskRequestEvent event) {
+
taskRequests.handle(event);
int hosts = scheduledRequests.leafTaskHostMapping.size();
@@ -248,7 +252,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void handle(TaskRequestEvent event) {
LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
- if(stopEventHandling) {
+ if(stopEventHandling.get()) {
event.getCallback().run(stopTaskRunnerReq);
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 08d080d..43e6493 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -340,10 +340,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
"containerId=" + taskRequest.getContainerId());
ContainerProxy container = context.getMasterContext().getResourceAllocator().
getContainer(taskRequest.getContainerId());
- String host = container.getTaskHostName();
+
if(container == null) {
continue;
}
+
+ String host = container.getTaskHostName();
QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
host, taskRequest.getCallback());
QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 39a73ba..f405ea7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -82,9 +82,6 @@ public class TajoContainerProxy extends ContainerProxy {
tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
} catch (Exception e) {
- //TODO retry
- RpcConnectionPool.getPool(context.getConf()).closeConnection(tajoWorkerRpc);
- tajoWorkerRpc = null;
LOG.error(e.getMessage(), e);
} finally {
RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
@@ -150,8 +147,6 @@ public class TajoContainerProxy extends ContainerProxy {
.build(),
NullCallback.get());
} catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 2c194fd..7dcc55f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
@@ -50,6 +51,7 @@ import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.ClassUtil;
@@ -383,10 +385,14 @@ public class TajoMaster extends CompositeService {
}
}
+ IOUtils.cleanup(LOG, catalogServer);
+
if(systemMetrics != null) {
systemMetrics.stop();
}
+ RpcChannelFactory.shutdown();
+
super.stop();
LOG.info("Tajo Master main thread exiting");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 6e2ee8b..0f8eb61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -82,12 +82,15 @@ public class TajoMasterClientService extends AbstractService {
// start the rpc server
String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
+ int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
try {
- server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
+ server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
} catch (Exception e) {
LOG.error(e);
+ throw new RuntimeException(e);
}
server.start();
+
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 7853f63..c3a0829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -63,8 +63,9 @@ public class TajoMasterService extends AbstractService {
public void start() {
String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
try {
- server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+ server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
} catch (Exception e) {
LOG.error(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
index dbec652..4f178fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -297,17 +297,11 @@ public class YarnContainerProxy extends ContainerProxy {
// Set the local resources
////////////////////////////////////////////////////////////////////////////
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- FileSystem fs = null;
- FileContext fsCtx = null;
LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
- try {
- fs = FileSystem.get(conf);
- fsCtx = FileContext.getFileContext(conf);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
try {
+ FileSystem fs = FileSystem.get(conf);
+ FileContext fsCtx = FileContext.getFileContext(conf);
Path systemConfPath = TajoConf.getSystemConfPath(conf);
if (!fs.exists(systemConfPath)) {
LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index 7c38e7a..28b5f08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -129,7 +129,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
}
}
- private class StopContainerRunner implements Runnable {
+ private static class StopContainerRunner implements Runnable {
private final ContainerProxy proxy;
private final ContainerId id;
public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2ab6c6a..2a93d3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -93,12 +93,10 @@ public class QueryInProgress extends CompositeService {
@Override
public void stop() {
- synchronized(stopped) {
- if(stopped.get()) {
- return;
- }
- stopped.set(true);
+ if(stopped.getAndSet(true)) {
+ return;
}
+
LOG.info("=========================================================");
LOG.info("Stop query:" + queryId);
@@ -113,10 +111,13 @@ public class QueryInProgress extends CompositeService {
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ break;
}
try {
- Thread.sleep(1000);
+ synchronized (this){
+ wait(100);
+ }
} catch (InterruptedException e) {
break;
}
@@ -126,11 +127,10 @@ public class QueryInProgress extends CompositeService {
}
}
- super.stop();
-
if(queryMasterRpc != null) {
RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
}
+ super.stop();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index ecf6cb2..ae6d5eb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -140,13 +140,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
@Override
public void stop() {
- synchronized(queryMasterStop) {
- if(queryMasterStop.get()) {
- return;
- }
-
- queryMasterStop.set(true);
- queryMasterStop.notifyAll();
+ if(queryMasterStop.getAndSet(true)){
+ return;
}
if(queryHeartbeatThread != null) {
@@ -183,8 +178,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
} catch (Exception e) {
- connPool.closeConnection(rpc);
- rpc = null;
LOG.error(e.getMessage());
} finally {
connPool.releaseConnection(rpc);
@@ -208,8 +201,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
return workerResourcesRequest.getWorkerResourcesList();
} catch (Exception e) {
- connPool.closeConnection(rpc);
- rpc = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(rpc);
@@ -238,8 +229,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
} catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
@@ -338,9 +327,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
TajoMasterProtocol.class, true);
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
- } catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
+ } catch (Exception e) {
+ //this function will be closed in new thread.
+ //When tajo do stop cluster, tajo master maybe throw closed connection exception
+
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 3618d3b..5ce57f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master.querymaster;
+import com.google.common.base.Preconditions;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
@@ -67,6 +68,8 @@ public class QueryMasterManagerService extends CompositeService
@Override
public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf tajoConf = (TajoConf) conf;
try {
// Setup RPC server
InetSocketAddress initIsa =
@@ -75,7 +78,8 @@ public class QueryMasterManagerService extends CompositeService
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
- this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+ int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
@@ -127,7 +131,7 @@ public class QueryMasterManagerService extends CompositeService
ContainerId cid =
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
- if(queryMasterTask == null || queryMasterTask.isStopped()) {
+ if(queryMasterTask.isStopped()) {
LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
done.run(LazyTaskScheduler.stopTaskRunnerReq);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index e193509..eb0528c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -160,10 +160,10 @@ public class QueryMasterTask extends CompositeService {
@Override
public void stop() {
- if(stopped.get()) {
+
+ if(stopped.getAndSet(true)) {
return;
}
- stopped.set(true);
LOG.info("Stopping QueryMasterTask:" + queryId);
@@ -177,8 +177,6 @@ public class QueryMasterTask extends CompositeService {
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.stopQueryMaster(null, queryId.getProto(), future);
} catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
@@ -249,7 +247,7 @@ public class QueryMasterTask extends CompositeService {
}
}
- private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+ private static class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
@Override
public void handle(QueryFinishEvent event) {
QueryId queryId = event.getQueryId();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index b39901e..2c3572c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -323,7 +323,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
- class WorkerResourceRequest {
+ static class WorkerResourceRequest {
boolean queryMasterRequest;
QueryId queryId;
TajoMasterProtocol.WorkerResourceAllocationRequest request;
@@ -339,7 +339,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
- class AllocatedWorkerResource {
+ static class AllocatedWorkerResource {
WorkerResource workerResource;
int allocatedMemoryMB;
float allocatedDiskSlots;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 72ada9b..80aab9b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -178,7 +178,7 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
try {
FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
- if(queryInProgress != null) {
+ if(queryInProgress == null) {
return;
}
TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
index 35dd6f1..9e895b8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -52,6 +52,7 @@ public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter
this.setDateFormat(null);
} catch (FileNotFoundException e) {
LOG.warn("Can't open metrics file:" + fileName);
+ this.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
index c9d25c5..f11d520 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -118,7 +118,7 @@ public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter i
this.filter = filterList;
this.period = 60;
- if(metricsProperties != null && metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
+ if(metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
}
afterInit();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 4f9f3fa..bb136f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,14 +18,13 @@
package org.apache.tajo.worker;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import java.io.File;
@@ -35,8 +34,6 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.FileChannel;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import static org.jboss.netty.channel.Channels.pipeline;
@@ -58,20 +55,9 @@ public class Fetcher {
private long fileLen;
private int messageReceiveCount;
-
- private static final ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("Fetcher Netty Boss #%d")
- .build();
- private static final ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("Fetcher Netty Worker #%d")
- .build();
- private static final ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
-
private ClientBootstrap bootstrap;
- public Fetcher(URI uri, File file) {
+ public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
this.uri = uri;
this.file = file;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 6d01f55..16427b6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -100,8 +100,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int memoryMBPerTask) {
//TODO consider disk slot
TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
- int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB()/memoryMBPerTask;
- return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
+ int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
+ clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
+ return Math.min(numTasks, clusterSlots);
}
@Override
@@ -117,10 +118,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public synchronized void stop() {
- if(stopped.get()) {
+ if (stopped.getAndSet(true)) {
return;
}
- stopped.set(true);
+
executorService.shutdownNow();
Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
@@ -177,7 +178,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
}
- protected class LaunchRunner implements Runnable {
+ protected static class LaunchRunner implements Runnable {
private final ContainerProxy proxy;
private final ContainerId id;
public LaunchRunner(ContainerId id, ContainerProxy proxy) {
@@ -198,7 +199,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
}
- private class StopContainerRunner implements Runnable {
+ private static class StopContainerRunner implements Runnable {
private final ContainerProxy proxy;
private final ContainerId id;
public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
@@ -256,8 +257,6 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 456bd95..2f763e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -43,6 +43,7 @@ import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.v2.DiskDeviceInfo;
@@ -68,8 +69,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.conf.TajoConf.ConfVars;
public class TajoWorker extends CompositeService {
- public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
- public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+ public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+ public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
@@ -304,10 +305,10 @@ public class TajoWorker extends CompositeService {
@Override
public void stop() {
- if(stopped.get()) {
+ if(stopped.getAndSet(true)) {
return;
}
- stopped.set(true);
+
if(webServer != null) {
try {
webServer.stop();
@@ -316,7 +317,9 @@ public class TajoWorker extends CompositeService {
}
}
if(workerHeartbeatThread != null) {
- workerHeartbeatThread.interrupt();
+ synchronized (workerHeartbeatThread){
+ workerHeartbeatThread.notifyAll();
+ }
}
if (catalogClient != null) {
@@ -324,7 +327,8 @@ public class TajoWorker extends CompositeService {
}
if(connPool != null) {
- connPool.close();
+ connPool.shutdown();
+ RpcChannelFactory.shutdown();
}
if(webServer != null && webServer.isAlive()) {
@@ -570,7 +574,7 @@ public class TajoWorker extends CompositeService {
clientPort = workerContext.getTajoWorkerClientService().getBindAddr().getPort();
}
- while(true) {
+ while(!stopped.get()) {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
@@ -626,15 +630,15 @@ public class TajoWorker extends CompositeService {
} catch (InterruptedException e) {
break;
} catch (Exception e) {
- connPool.closeConnection(tmClient);
- tmClient = null;
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
try {
- Thread.sleep(10 * 1000);
+ synchronized (workerHeartbeatThread){
+ wait(10 * 1000);
+ }
} catch (InterruptedException e) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index f6e7742..da3dc34 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
@@ -29,6 +30,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.master.querymaster.Query;
@@ -51,7 +53,7 @@ public class TajoWorkerClientService extends AbstractService {
private InetSocketAddress bindAddr;
private String addr;
private int port;
- private Configuration conf;
+ private TajoConf conf;
private TajoWorker.WorkerContext workerContext;
private TajoWorkerClientProtocolServiceHandler serviceHandler;
@@ -64,7 +66,8 @@ public class TajoWorkerClientService extends AbstractService {
@Override
public void init(Configuration conf) {
- this.conf = conf;
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ this.conf = (TajoConf) conf;
this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
// init RPC Server in constructor cause Heartbeat Thread use bindAddr
@@ -77,7 +80,8 @@ public class TajoWorkerClientService extends AbstractService {
}
// TODO blocking/non-blocking??
- this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+ int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 60b0903..c770696 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.google.common.base.Preconditions;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
@@ -53,6 +54,7 @@ public class TajoWorkerManagerService extends CompositeService
@Override
public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
TajoConf tajoConf = (TajoConf) conf;
try {
// Setup RPC server
@@ -62,7 +64,8 @@ public class TajoWorkerManagerService extends CompositeService
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
- this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
+ int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa, workerNum);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());