You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/11 07:46:12 UTC
tajo git commit: TAJO-1831: Add a shutdown hook manager in order to
set priorities.
Repository: tajo
Updated Branches:
refs/heads/master cea832aca -> 60d8d4bc8
TAJO-1831: Add a shutdown hook manager in order to set priorities.
Closes #751
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/60d8d4bc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/60d8d4bc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/60d8d4bc
Branch: refs/heads/master
Commit: 60d8d4bc80444402c2b4ad61644859d7bda8b380
Parents: cea832a
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Sep 11 14:45:11 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Sep 11 14:45:11 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/cli/tsql/TajoCli.java | 6 +-
.../apache/tajo/client/SessionConnection.java | 42 ++---
.../java/org/apache/tajo/conf/TajoConf.java | 2 +-
.../apache/tajo/util/ShutdownHookManager.java | 188 +++++++++++++++++++
tajo-core-tests/pom.xml | 15 ++
.../org/apache/tajo/client/TestTajoClient.java | 26 +++
.../java/org/apache/tajo/master/TajoMaster.java | 8 +-
.../java/org/apache/tajo/worker/Fetcher.java | 4 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 4 +-
.../org/apache/tajo/rpc/NettyServerBase.java | 2 +-
.../java/org/apache/tajo/rpc/NettyUtils.java | 139 ++++++++++++++
.../org/apache/tajo/rpc/RpcChannelFactory.java | 180 ------------------
.../org/apache/tajo/rpc/AsyncRpcClient.java | 11 +-
.../org/apache/tajo/rpc/AsyncRpcServer.java | 4 +-
.../org/apache/tajo/rpc/BlockingRpcClient.java | 12 +-
.../org/apache/tajo/rpc/BlockingRpcServer.java | 4 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 15 +-
.../org/apache/tajo/rpc/RpcClientManager.java | 30 ++-
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 2 +-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 2 +-
22 files changed, 463 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 04adaa5..2823bf8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -36,6 +36,8 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1831: Add a shutdown hook manager in order to set priorities. (jinho)
+
TAJO-1817: Improve SQL parser error message. (hyunsik)
TAJO-1825: Remove zero length fragments when file length is zero. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index f17ec80..83763e8 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -39,6 +39,7 @@ import org.apache.tajo.exception.TajoException;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.ShutdownHookManager;
import java.io.*;
import java.lang.reflect.Constructor;
@@ -50,6 +51,7 @@ import java.util.Map;
import java.util.TreeMap;
public class TajoCli {
+ public static final int SHUTDOWN_HOOK_PRIORITY = 50;
public static final String ERROR_PREFIX = "ERROR: ";
public static final String KILL_PREFIX = "KILL: ";
@@ -373,7 +375,7 @@ public class TajoCli {
}
private void addShutdownHook() {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
@@ -382,7 +384,7 @@ public class TajoCli {
}
client.close();
}
- }));
+ }, SHUTDOWN_HOOK_PRIORITY);
}
private String updatePrompt(ParsingState state) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index b63d35b..ac0ff52 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -19,7 +19,7 @@
package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
-import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.EventLoopGroup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.SessionVars;
@@ -38,16 +38,14 @@ import org.apache.tajo.ipc.ClientProtos.UpdateSessionVariableRequest;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
import java.io.Closeable;
@@ -57,9 +55,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
import static org.apache.tajo.exception.ReturnStateUtil.*;
@@ -70,8 +66,6 @@ public class SessionConnection implements Closeable {
private final static Log LOG = LogFactory.getLog(SessionConnection.class);
- private final static AtomicInteger connections = new AtomicInteger();
-
final RpcClientManager manager;
private String baseDatabase;
@@ -87,6 +81,8 @@ public class SessionConnection implements Closeable {
private final ServiceTracker serviceTracker;
+ private final EventLoopGroup eventLoopGroup;
+
private NettyClientBase client;
private final KeyValueSet properties;
@@ -110,7 +106,13 @@ public class SessionConnection implements Closeable {
this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
this.userInfo = UserRoleInfo.getCurrentUser();
- this.client = getTajoMasterConnection();
+ this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4);
+ try {
+ this.client = getTajoMasterConnection();
+ } catch (TajoRuntimeException e) {
+ NettyUtils.shutdown(eventLoopGroup);
+ throw e;
+ }
}
public Map<String, String> getClientSideSessionVars() {
@@ -127,16 +129,8 @@ public class SessionConnection implements Closeable {
RpcClientManager.cleanup(client);
// Client do not closed on idle state for support high available
- this.client = manager.newClient(
- getTajoMasterAddr(),
- TajoMasterClientProtocol.class,
- false,
- manager.getRetries(),
- 0,
- TimeUnit.SECONDS,
- false);
- connections.incrementAndGet();
-
+ this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class,
+ manager.getRetries(), eventLoopGroup);
} catch (Throwable t) {
throw new TajoRuntimeException(new ClientConnectionException(t));
}
@@ -346,14 +340,7 @@ public class SessionConnection implements Closeable {
// ignore
} finally {
RpcClientManager.cleanup(client);
- if(connections.decrementAndGet() == 0) {
- if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) {
- RpcChannelFactory.shutdownGracefully();
- if (LOG.isDebugEnabled()) {
- LOG.debug("RPC connection is closed");
- }
- }
- }
+ NettyUtils.shutdown(eventLoopGroup);
}
}
@@ -457,5 +444,4 @@ public class SessionConnection implements Closeable {
}
return builder.build();
}
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 909f266..0f393f6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -208,7 +208,7 @@ public class TajoConf extends Configuration {
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
2, Validators.min("1")),
SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192),
- SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 10, Validators.min("1")),
+ SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")),
SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000),
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
new file mode 100644
index 0000000..3ec535f
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The <code>ShutdownHookManager</code> enables running shutdownHook
+ * in a deterministic order, higher priority first.
+ * <p/>
+ * The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
+ * This class registers a single JVM shutdownHook and run all the
+ * shutdownHooks registered to it (to this class) in order based on their
+ * priority.
+ *
+ * this is an implementation copied from hadoop-common
+ */
+public class ShutdownHookManager {
+
+ private static final ShutdownHookManager MGR = new ShutdownHookManager();
+
+ private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
+
+ static {
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ @Override
+ public void run() {
+ MGR.shutdownInProgress.set(true);
+ for (Runnable hook: MGR.getShutdownHooksInOrder()) {
+ try {
+ hook.run();
+ } catch (Throwable ex) {
+ LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
+ "' failed, " + ex.toString(), ex);
+ }
+ }
+ }
+ }
+ );
+ }
+
+ /**
+ * Return <code>ShutdownHookManager</code> singleton.
+ *
+ * @return <code>ShutdownHookManager</code> singleton.
+ */
+ public static ShutdownHookManager get() {
+ return MGR;
+ }
+
+ /**
+ * Private structure to store ShutdownHook and its priority.
+ */
+ private static class HookEntry {
+ Runnable hook;
+ int priority;
+
+ public HookEntry(Runnable hook, int priority) {
+ this.hook = hook;
+ this.priority = priority;
+ }
+
+ @Override
+ public int hashCode() {
+ return hook.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ boolean eq = false;
+ if (obj != null) {
+ if (obj instanceof HookEntry) {
+ eq = (hook == ((HookEntry)obj).hook);
+ }
+ }
+ return eq;
+ }
+
+ }
+
+ private Set<HookEntry> hooks =
+ Collections.synchronizedSet(new HashSet<HookEntry>());
+
+ private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
+
+ //private to constructor to ensure singularity
+ private ShutdownHookManager() {
+ }
+
+ /**
+ * Returns the list of shutdownHooks in order of execution,
+ * Highest priority first.
+ *
+ * @return the list of shutdownHooks in order of execution.
+ */
+ List<Runnable> getShutdownHooksInOrder() {
+ List<HookEntry> list;
+ synchronized (MGR.hooks) {
+ list = new ArrayList<HookEntry>(MGR.hooks);
+ }
+ Collections.sort(list, new Comparator<HookEntry>() {
+
+ //reversing comparison so highest priority hooks are first
+ @Override
+ public int compare(HookEntry o1, HookEntry o2) {
+ return o2.priority - o1.priority;
+ }
+ });
+ List<Runnable> ordered = new ArrayList<Runnable>();
+ for (HookEntry entry: list) {
+ ordered.add(entry.hook);
+ }
+ return ordered;
+ }
+
+ /**
+ * Adds a shutdownHook with a priority, the higher the priority
+ * the earlier will run. ShutdownHooks with same priority run
+ * in a non-deterministic order.
+ *
+ * @param shutdownHook shutdownHook <code>Runnable</code>
+ * @param priority priority of the shutdownHook.
+ */
+ public void addShutdownHook(Runnable shutdownHook, int priority) {
+ if (shutdownHook == null) {
+ throw new IllegalArgumentException("shutdownHook cannot be NULL");
+ }
+ if (shutdownInProgress.get()) {
+ throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
+ }
+ hooks.add(new HookEntry(shutdownHook, priority));
+ }
+
+ /**
+ * Removes a shutdownHook.
+ *
+ * @param shutdownHook shutdownHook to remove.
+ * @return TRUE if the shutdownHook was registered and removed,
+ * FALSE otherwise.
+ */
+ public boolean removeShutdownHook(Runnable shutdownHook) {
+ if (shutdownInProgress.get()) {
+ throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
+ }
+ return hooks.remove(new HookEntry(shutdownHook, 0));
+ }
+
+ /**
+ * Indicates if a shutdownHook is registered or not.
+ *
+ * @param shutdownHook shutdownHook to check if registered.
+ * @return TRUE/FALSE depending if the shutdownHook is is registered.
+ */
+ public boolean hasShutdownHook(Runnable shutdownHook) {
+ return hooks.contains(new HookEntry(shutdownHook, 0));
+ }
+
+ /**
+ * Indicates if shutdown is in progress or not.
+ *
+ * @return TRUE if the shutdown is in progress, otherwise FALSE.
+ */
+ public boolean isShutdownInProgress() {
+ return shutdownInProgress.get();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 8199f46..20b7378 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -335,6 +335,21 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
index efadc7a..38819f1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -40,6 +40,7 @@ import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
import org.apache.tajo.ipc.ClientProtos.StageHistoryProto;
+import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
@@ -47,6 +48,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.powermock.reflect.Whitebox;
import java.io.IOException;
import java.io.InputStream;
@@ -771,4 +773,28 @@ public class TestTajoClient {
assertEquals(1, taskHistories.get(1).getTotalReadRows());
assertEquals(1, taskHistories.get(1).getTotalWriteRows());
}
+
+ @Test
+ public void testClientRPCInterference() throws Exception {
+ TajoClient client = cluster.newTajoClient();
+ TajoClient client2 = cluster.newTajoClient();
+
+
+ NettyClientBase rpcClient = Whitebox.getInternalState(client, NettyClientBase.class);
+ assertNotNull(rpcClient);
+
+ NettyClientBase rpcClient2 = Whitebox.getInternalState(client2, NettyClientBase.class);
+ assertNotNull(rpcClient);
+
+ assertNotEquals(rpcClient.getChannel().eventLoop(), rpcClient2.getChannel().eventLoop());
+
+ client.close();
+ client2.close();
+
+ rpcClient.getChannel().eventLoop().terminationFuture().sync();
+ assertTrue(rpcClient.getChannel().eventLoop().isTerminated());
+
+ rpcClient2.getChannel().eventLoop().terminationFuture().sync();
+ assertTrue(rpcClient2.getChannel().eventLoop().isTerminated());
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 754df7f..1197e98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -49,7 +49,6 @@ import org.apache.tajo.master.rm.TajoResourceManager;
import org.apache.tajo.metrics.ClusterResourceMetricSet;
import org.apache.tajo.metrics.Master;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
-import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rule.EvaluationContext;
@@ -86,6 +85,8 @@ public class TajoMaster extends CompositeService {
/** Class Logger */
private static final Log LOG = LogFactory.getLog(TajoMaster.class);
+ public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
@@ -162,7 +163,7 @@ public class TajoMaster extends CompositeService {
public void serviceInit(Configuration conf) throws Exception {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
context = new MasterContext(systemConf);
clock = new SystemClock();
@@ -547,12 +548,13 @@ public class TajoMaster extends CompositeService {
&& AbstractDBStore.needShutdown(catalogServer.getStoreUri())) {
DerbyStore.shutdown();
}
- RpcChannelFactory.shutdownGracefully();
+ RpcClientManager.shutdown();
}
}
}
public static void main(String[] args) throws Exception {
+ Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index ff85a4b..762278b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
import java.io.File;
import java.io.FileNotFoundException;
@@ -89,7 +89,7 @@ public class Fetcher {
if (!useLocalFile) {
bootstrap = new Bootstrap()
.group(
- RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER,
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
.channel(NioSocketChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 65a9511..fbb8d54 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -43,7 +43,6 @@ import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.querymaster.QueryMaster;
import org.apache.tajo.querymaster.QueryMasterManagerService;
-import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -78,6 +77,7 @@ public class TajoWorker extends CompositeService {
public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
@@ -147,7 +147,7 @@ public class TajoWorker extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
RackResolver.init(systemConf);
@@ -571,7 +571,7 @@ public class TajoWorker extends CompositeService {
LOG.info("TajoWorker received SIGINT Signal");
LOG.info("============================================");
stop();
- RpcChannelFactory.shutdownGracefully();
+ RpcClientManager.shutdown();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 29cf719..59a758f 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -58,7 +58,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
@@ -190,7 +190,7 @@ public class TajoPullServerService extends AbstractService {
int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2);
- selector = RpcChannelFactory.createServerChannelFactory("TajoPullServerService", workerNum)
+ selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index ad443d7..2c154bf 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -79,7 +79,7 @@ public class NettyServerBase {
listener.onBeforeInit(this);
}
- bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+ bootstrap = NettyUtils.createServerBootstrap(serviceName, workerNum);
this.initializer = initializer;
bootstrap
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
new file mode 100644
index 0000000..01fd48b
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class NettyUtils {
+ private static final Log LOG = LogFactory.getLog(NettyUtils.class);
+
+ private static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
+
+ private static final Object lockObjectForLoopGroup = new Object();
+ private static AtomicInteger serverCount = new AtomicInteger(0);
+
+ public enum GROUP {
+ DEFAULT,
+ FETCHER
+ }
+
+ private static final Map<GROUP, EventLoopGroup> eventLoopGroupMap =
+ new ConcurrentHashMap<GROUP, EventLoopGroup>();
+
+ private NettyUtils(){
+ }
+
+ /**
+ * Get default EventLoopGroup of netty’s. servers and clients can shared it.
+ */
+ public static EventLoopGroup getDefaultEventLoopGroup() {
+ return getSharedEventLoopGroup(GROUP.DEFAULT, DEFAULT_THREAD_NUM);
+ }
+
+ /**
+ * Get EventLoopGroup of netty’s.
+ *
+ * @param clientId
+ * @param threads
+ * @return A EventLoopGroup by key
+ */
+ public static EventLoopGroup getSharedEventLoopGroup(GROUP clientId, int threads) {
+ EventLoopGroup returnEventLoopGroup;
+
+ synchronized (lockObjectForLoopGroup) {
+ if (!eventLoopGroupMap.containsKey(clientId)) {
+ eventLoopGroupMap.put(clientId, createEventLoopGroup(clientId.name(), threads));
+ }
+
+ returnEventLoopGroup = eventLoopGroupMap.get(clientId);
+ if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+ returnEventLoopGroup = createEventLoopGroup(clientId.name(), threads);
+ eventLoopGroupMap.put(clientId, returnEventLoopGroup);
+ }
+ }
+
+ return returnEventLoopGroup;
+ }
+
+ public static EventLoopGroup createEventLoopGroup(String name, int threads) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create " + name + " EventLoopGroup. threads:" + threads);
+ }
+
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory clientFactory = builder.setNameFormat(name + " #%d").build();
+
+ return createEventLoopGroup(threads, clientFactory);
+ }
+
+ protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+ return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
+ }
+
+ private static EventLoopGroup createEventLoopGroup(int threads, ThreadFactory factory) {
+ return new NioEventLoopGroup(threads, factory);
+ }
+
+ /**
+ * Server must release the external resources
+ */
+ public static ServerBootstrap createServerBootstrap(String name, int threads) {
+ name = name + "-" + serverCount.incrementAndGet();
+
+ EventLoopGroup eventLoopGroup = createEventLoopGroup(name, threads);
+ return new ServerBootstrap().group(eventLoopGroup, eventLoopGroup);
+ }
+
+ public static void shutdownGracefully() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown Shared RPC Pool");
+ }
+ synchronized (lockObjectForLoopGroup) {
+ for (EventLoopGroup eventLoopGroup : eventLoopGroupMap.values()) {
+ try {
+ shutdown(eventLoopGroup).sync();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ eventLoopGroupMap.clear();
+ }
+ }
+
+ public static io.netty.util.concurrent.Future shutdown(EventLoopGroup eventLoopGroup) {
+ if (eventLoopGroup != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown EventLoopGroup :" + eventLoopGroup.toString());
+ }
+
+ return eventLoopGroup.shutdownGracefully();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
deleted file mode 100644
index eb34ca2..0000000
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class RpcChannelFactory {
- private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-
- private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
-
- private static final Object lockObjectForLoopGroup = new Object();
- private static AtomicInteger serverCount = new AtomicInteger(0);
-
- public enum ClientChannelId {
- CLIENT_DEFAULT,
- FETCHER
- }
-
- private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
- new ConcurrentHashMap<ClientChannelId, Integer>();
- private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
- new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
-
- private RpcChannelFactory(){
- }
-
- static {
- Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
-
- defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
- defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
- }
-
- /**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- */
- public static EventLoopGroup getSharedClientEventloopGroup() {
- return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
- }
-
- /**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- *
- * @param workerNum The number of workers
- */
- public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
- //shared woker and boss pool
- return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
- }
-
- /**
- * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
- *
- * @param clientId
- * @param workerNum
- * @return
- */
- public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
- Queue<EventLoopGroup> eventLoopGroupQueue;
- EventLoopGroup returnEventLoopGroup;
-
- synchronized (lockObjectForLoopGroup) {
- eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
- if (eventLoopGroupQueue == null) {
- eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
- }
-
- returnEventLoopGroup = eventLoopGroupQueue.poll();
- if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
- returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
- }
- eventLoopGroupQueue.add(returnEventLoopGroup);
- }
-
- return returnEventLoopGroup;
- }
-
- protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
- return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
- }
-
- // Client must release the external resources
- protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
- int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
- Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
- eventLoopGroupPool.put(clientId, loopGroupQueue);
-
- for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
- loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
- }
-
- return loopGroupQueue;
- }
-
- protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
- }
-
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
-
- return new NioEventLoopGroup(workerNum, clientFactory);
- }
-
- // Client must release the external resources
- public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
- name = name + "-" + serverCount.incrementAndGet();
- if(LOG.isInfoEnabled()){
- LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
- }
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
- ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-
- EventLoopGroup bossGroup =
- new NioEventLoopGroup(1, bossFactory);
- EventLoopGroup workerGroup =
- new NioEventLoopGroup(workerNum, workerFactory);
-
- return new ServerBootstrap().group(bossGroup, workerGroup);
- }
-
- public static void shutdownGracefully(){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Shutdown Shared RPC Pool");
- }
-
- synchronized(lockObjectForLoopGroup) {
- for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
- for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
- eventLoopGroup.shutdownGracefully();
- }
-
- eventLoopGroupQueue.clear();
- }
- eventLoopGroupPool.clear();
- }
- }
-
- static class CleanUpHandler extends Thread {
-
- @Override
- public void run() {
- RpcChannelFactory.shutdownGracefully();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index dd7d495..6fb62d4 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -18,9 +18,11 @@
package org.apache.tajo.rpc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.*;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.EventLoopGroup;
import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
@@ -34,9 +36,10 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
private final ProxyRpcChannel rpcChannel;
private final NettyChannelInboundHandler handler;
+ @VisibleForTesting
AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
throws ClassNotFoundException, NoSuchMethodException {
- this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
+ this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
}
/**
@@ -49,10 +52,12 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
* otherwise it is request timeout on active-state
* @param timeUnit TimeUnit
* @param enablePing enable to detect remote peer hangs
+ * @param eventLoopGroup thread pool of netty's
* @throws ClassNotFoundException
* @throws NoSuchMethodException
*/
- AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing)
+ AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
+ EventLoopGroup eventLoopGroup)
throws ClassNotFoundException, NoSuchMethodException {
super(rpcConnectionKey, retries);
@@ -62,7 +67,7 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
init(new ProtoClientChannelInitializer(handler,
RpcResponse.getDefaultInstance(),
timeUnit.toNanos(timeout),
- enablePing));
+ enablePing), eventLoopGroup);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 88ffaf6..0e12c53 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -42,7 +42,7 @@ public class AsyncRpcServer extends NettyServerBase {
public AsyncRpcServer(final Class<?> protocol,
final Object instance,
final InetSocketAddress bindAddress,
- final int workerNum)
+ final int threads)
throws Exception {
super(protocol.getSimpleName(), bindAddress);
@@ -54,7 +54,7 @@ public class AsyncRpcServer extends NettyServerBase {
this.service = (Service) method.invoke(null, instance);
this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
- super.init(this.initializer, workerNum);
+ super.init(this.initializer, threads);
}
@ChannelHandler.Sharable
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 349a0a0..4327003 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,9 +18,11 @@
package org.apache.tajo.rpc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.*;
import com.google.protobuf.Descriptors.MethodDescriptor;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.EventLoopGroup;
import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
@@ -35,9 +37,10 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
private final ProxyRpcChannel rpcChannel;
private final NettyChannelInboundHandler handler;
+ @VisibleForTesting
BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
throws NoSuchMethodException, ClassNotFoundException {
- this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
+ this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
}
/**
@@ -50,11 +53,12 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
* otherwise it is request timeout on active-state
* @param timeUnit TimeUnit
* @param enablePing enable to detect remote peer hangs
+ * @param eventLoopGroup thread pool of netty's
* @throws ClassNotFoundException
* @throws NoSuchMethodException
*/
- BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit,
- boolean enablePing) throws ClassNotFoundException, NoSuchMethodException {
+ BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
+ EventLoopGroup eventLoopGroup) throws ClassNotFoundException, NoSuchMethodException {
super(rpcConnectionKey, retries);
this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
@@ -63,7 +67,7 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
init(new ProtoClientChannelInitializer(handler,
RpcResponse.getDefaultInstance(),
timeUnit.toNanos(timeout),
- enablePing));
+ enablePing), eventLoopGroup);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 007ada5..3f538bb 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -39,7 +39,7 @@ public class BlockingRpcServer extends NettyServerBase {
public BlockingRpcServer(final Class<?> protocol,
final Object instance,
final InetSocketAddress bindAddress,
- final int workerNum)
+ final int threads)
throws Exception {
super(protocol.getSimpleName(), bindAddress);
@@ -55,7 +55,7 @@ public class BlockingRpcServer extends NettyServerBase {
this.service = (BlockingService) method.invoke(null, instance);
this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
- super.init(this.initializer, workerNum);
+ super.init(this.initializer, threads);
}
@ChannelHandler.Sharable
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 5f76bfc..c6d90ed 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -39,7 +39,6 @@ import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@@ -66,10 +65,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
}
// should be called from sub class
- protected void init(ChannelInitializer<Channel> initializer) {
+ protected void init(ChannelInitializer<Channel> initializer, EventLoopGroup eventLoopGroup) {
this.bootstrap = new Bootstrap();
this.bootstrap
- .group(RpcChannelFactory.getSharedClientEventloopGroup())
+ .group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(initializer)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
@@ -117,6 +116,11 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
*/
protected void invoke(final RpcProtos.RpcRequest rpcRequest, final T callback, final int retry) {
+ if(getChannel().eventLoop().isShuttingDown()) {
+ LOG.warn("RPC is shutting down");
+ return;
+ }
+
ChannelPromise promise = getChannel().newPromise();
promise.addListener(new GenericFutureListener<ChannelFuture>() {
@@ -197,6 +201,11 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
if (maxRetries > retries) {
retries++;
+ if(getChannel().eventLoop().isShuttingDown()) {
+ LOG.warn("RPC is shutting down");
+ return;
+ }
+
LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + "\nTry to reconnect : " + getKey().addr);
try {
Thread.sleep(RpcConstants.DEFAULT_PAUSE);
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 111754e..aa7ba67 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -19,6 +19,7 @@
package org.apache.tajo.rpc;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.CommonsLoggerFactory;
@@ -64,12 +65,22 @@ public class RpcClientManager {
long timeout,
TimeUnit timeUnit,
boolean enablePing)
+ throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+ return makeClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, NettyUtils.getDefaultEventLoopGroup());
+ }
+
+ private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey,
+ int retries,
+ long timeout,
+ TimeUnit timeUnit,
+ boolean enablePing,
+ EventLoopGroup eventLoopGroup)
throws NoSuchMethodException, ClassNotFoundException, ConnectException {
NettyClientBase client;
if (rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
+ client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
} else {
- client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
+ client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
}
return (T) client;
}
@@ -152,6 +163,19 @@ public class RpcClientManager {
return client;
}
+ public synchronized <T extends NettyClientBase> T newBlockingClient(InetSocketAddress addr,
+ Class<?> protocolClass,
+ int retries,
+ EventLoopGroup eventLoopGroup)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+ T client = makeClient(new RpcConnectionKey(addr, protocolClass, false),
+ retries, 0, TimeUnit.SECONDS, false, eventLoopGroup);
+ client.connect();
+ assert client.isConnected();
+ return client;
+ }
+
/**
* Request to close this clients
* After it is closed, it is removed from clients map.
@@ -174,7 +198,7 @@ public class RpcClientManager {
*/
public static void shutdown() {
close();
- RpcChannelFactory.shutdownGracefully();
+ NettyUtils.shutdownGracefully();
}
protected static boolean contains(RpcConnectionKey key) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 18c7d80..4f17476 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -139,7 +139,7 @@ public class TestAsyncRpc {
@AfterClass
public static void tearDownClass() throws Exception {
- RpcChannelFactory.shutdownGracefully();
+ RpcClientManager.shutdown();
}
public void tearDownRpcServer() throws Exception {
http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 9f95f58..0fae7ee 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -130,7 +130,7 @@ public class TestBlockingRpc {
@AfterClass
public static void tearDownClass() throws Exception {
- RpcChannelFactory.shutdownGracefully();
+ RpcClientManager.shutdown();
}
public void tearDownRpcServer() throws Exception {