You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/10 08:13:43 UTC
tajo git commit: TAJO-1281: Remove hadoop-common dependency from
tajo-rpc.
Repository: tajo
Updated Branches:
refs/heads/master 807868bd4 -> 9eac34fe3
TAJO-1281: Remove hadoop-common dependency from tajo-rpc.
Closes #343
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9eac34fe
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9eac34fe
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9eac34fe
Branch: refs/heads/master
Commit: 9eac34fe3121b8bc6bbe1b81ca76852ddedb2603
Parents: 807868b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Jan 10 16:12:08 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Jan 10 16:12:08 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 11 +---
.../apache/tajo/client/SessionConnection.java | 2 +-
.../org/apache/tajo/master/QueryInProgress.java | 4 +-
.../apache/tajo/master/TajoContainerProxy.java | 14 ++--
.../apache/tajo/querymaster/QueryMaster.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 2 +-
.../tajo/worker/TajoResourceAllocator.java | 6 +-
.../tajo/worker/WorkerHeartbeatService.java | 2 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 2 +-
tajo-rpc/pom.xml | 27 ++++----
.../org/apache/tajo/rpc/AsyncRpcClient.java | 3 +-
.../org/apache/tajo/rpc/BlockingRpcClient.java | 5 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 3 +-
.../org/apache/tajo/rpc/NettyServerBase.java | 4 +-
.../org/apache/tajo/rpc/RpcChannelFactory.java | 18 ++++--
.../org/apache/tajo/rpc/RpcConnectionPool.java | 13 ++--
.../main/java/org/apache/tajo/rpc/RpcUtils.java | 68 ++++++++++++++++++++
.../org/apache/tajo/rpc/ServerCallable.java | 5 +-
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 7 +-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 12 ++--
21 files changed, 137 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 89488da..b5578e7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1281: Remove hadoop-common dependency from tajo-rpc. (hyunsik)
+
TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 8ef1c9a..1a2fd44 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -19,7 +19,6 @@
package org.apache.tajo.catalog;
import com.google.protobuf.ServiceException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.annotation.Nullable;
@@ -27,21 +26,15 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.exception.NoSuchFunctionException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.util.ProtoUtil;
import java.net.InetSocketAddress;
@@ -62,7 +55,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
- this.pool = RpcConnectionPool.getPool(conf);
+ this.pool = RpcConnectionPool.getPool();
this.catalogServerAddr = catalogServerAddr;
this.conf = conf;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 1bc8050..5490be4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -105,7 +105,7 @@ public class SessionConnection implements Closeable {
this.tajoMasterAddr = addr;
int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
// Don't share connection pool per client
- connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
+ connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 73d8cb2..7587543 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -96,7 +96,7 @@ public class QueryInProgress {
masterContext.getResourceManager().releaseQueryMaster(queryId);
if(queryMasterRpc != null) {
- RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+ RpcConnectionPool.getPool().closeConnection(queryMasterRpc);
}
masterContext.getHistoryWriter().appendHistory(queryInfo);
@@ -130,7 +130,7 @@ public class QueryInProgress {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
queryMasterRpc =
- RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+ RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true);
queryMasterRpcClient = queryMasterRpc.getStub();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 2ffd7ca..588b7ee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -25,18 +25,18 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.master.container.TajoContainer;
import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.ha.HAServiceUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -79,13 +79,13 @@ public class TajoContainerProxy extends ContainerProxy {
NettyClientBase tajoWorkerRpc = null;
try {
InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
- tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
- RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+ RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
}
}
@@ -96,7 +96,7 @@ public class TajoContainerProxy extends ContainerProxy {
.getQueryMasterManagerService().getBindAddr();
InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
- tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -114,7 +114,7 @@ public class TajoContainerProxy extends ContainerProxy {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
- RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+ RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
}
}
@@ -166,7 +166,7 @@ public class TajoContainerProxy extends ContainerProxy {
containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
}
- RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
+ RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
// In TajoMaster HA mode, if backup master be active status,
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 53390a1..be78fc3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -101,7 +101,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
LOG.info("QueryMaster init");
try {
this.systemConf = (TajoConf)conf;
- this.connPool = RpcConnectionPool.getPool(systemConf);
+ this.connPool = RpcConnectionPool.getPool();
querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
queryMasterContext = new QueryMasterContext(systemConf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index dd3ee68..b120d5b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -100,7 +100,7 @@ public class ExecutionBlockContext {
throws Throwable {
this.manager = manager;
this.executionBlockId = event.getExecutionBlockId();
- this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
+ this.connPool = RpcConnectionPool.getPool();
this.queryMaster = queryMaster;
this.systemConf = manager.getTajoConf();
this.reporter = new Reporter();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 04b65d2..7278317 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -186,14 +186,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
NettyClientBase tajoWorkerRpc = null;
try {
InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
- tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get());
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
- RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc);
+ RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
}
}
@@ -271,7 +271,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
.build();
- RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
+ RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index b92c4cd..676c72b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -74,7 +74,7 @@ public class WorkerHeartbeatService extends AbstractService {
Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
this.systemConf = (TajoConf) conf;
- connectionPool = RpcConnectionPool.getPool(systemConf);
+ connectionPool = RpcConnectionPool.getPool();
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 68890e3..9d9f39c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -40,7 +40,7 @@ import java.net.InetSocketAddress;
public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception {
- RpcConnectionPool pool = RpcConnectionPool.getPool(tajoConf);
+ RpcConnectionPool pool = RpcConnectionPool.getPool();
NettyClientBase masterClient = null;
InetSocketAddress masterAddress = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 1e00e70..d0037ca 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -137,23 +137,24 @@
<dependencies>
<dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-common</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 7a416a8..4b1842e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -180,7 +179,7 @@ public class AsyncRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
return "Exception [" + protocol.getCanonicalName() +
- "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+ "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
getChannel().getRemoteAddress()) + ")]: " + message;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 03d5d3e..869919c 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -159,7 +158,7 @@ public class BlockingRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
if(protocol != null && getChannel() != null) {
return protocol.getName() +
- "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+ "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
getChannel().getRemoteAddress()) + "): " + message;
} else {
return "Exception " + message;
@@ -169,7 +168,7 @@ public class BlockingRpcClient extends NettyClientBase {
private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
if(protocol != null && getChannel() != null) {
return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
- NetUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+ RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
} else {
return new TajoServiceException(response.getErrorMessage());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index d0002de..bc0c567 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -110,7 +109,7 @@ public abstract class NettyClientBase implements Closeable {
public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
if(addr.isUnresolved()){
- addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+ addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
}
handleConnectionInternally(addr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index e75418d..ef090ff 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,12 +20,10 @@ package org.apache.tajo.rpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.DefaultChannelFuture;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -126,7 +124,7 @@ public class NettyServerBase {
if (bindAddress != null) {
LOG.info("Rpc (" + serviceName + ") listened on "
- + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+ + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index 6274eff..0727f71 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.*;
@@ -34,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public final class RpcChannelFactory {
private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+
+ private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
+
private static ClientSocketChannelFactory factory;
private static AtomicInteger clientCount = new AtomicInteger(0);
private static AtomicInteger serverCount = new AtomicInteger(0);
@@ -45,11 +47,19 @@ public final class RpcChannelFactory {
* make this factory static thus all clients can share its thread pool.
* NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
*/
- public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){
+ public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() {
+ return getSharedClientChannelFactory(DEFAULT_WORKER_NUM);
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ *
+ * @param workerNum The number of workers
+ */
+ public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){
//shared woker and boss pool
if(factory == null){
- TajoConf conf = new TajoConf();
- int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM);
factory = createClientChannelFactory("Internal-Client", workerNum);
}
return factory;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 2f3d433..c8e622b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -43,25 +42,23 @@ public class RpcConnectionPool {
private static RpcConnectionPool instance;
private final ClientSocketChannelFactory channelFactory;
- private final TajoConf conf;
public final static int RPC_RETRIES = 3;
- private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
- this.conf = conf;
+ private RpcConnectionPool(ClientSocketChannelFactory channelFactory) {
this.channelFactory = channelFactory;
}
- public synchronized static RpcConnectionPool getPool(TajoConf conf) {
+ public synchronized static RpcConnectionPool getPool() {
if(instance == null) {
InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
- instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
+ instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory());
}
return instance;
}
- public synchronized static RpcConnectionPool newPool(TajoConf conf, String poolName, int workerNum) {
- return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
+ public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) {
+ return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
}
private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
new file mode 100644
index 0000000..b6be05f
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+public class RpcUtils {
+
+ public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+
+ /**
+ * Util method to build socket addr from either:
+ * <host>
+ * <host>:<port>
+ * <fs>://<host>:<port>/<path>
+ */
+ public static InetSocketAddress createSocketAddr(String host, int port) {
+ return new InetSocketAddress(host, port);
+ }
+
+ /**
+ * Returns InetSocketAddress that a client can use to
+ * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+ * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+ * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+ *
+ * @param addr of a listener
+ * @return socket address that a client can use to connect to the server.
+ */
+ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+ if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+ try {
+ addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+ } catch (UnknownHostException uhe) {
+ // shouldn't get here unless the host doesn't have a loopback iface
+ addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+ }
+ }
+ InetSocketAddress canonicalAddress =
+ new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
+ return canonicalAddress;
+ }
+
+ public static InetSocketAddress createUnresolved(String addr) {
+ String [] splitted = addr.split(":");
+ return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index b4e5f9a..140f781 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,15 +18,14 @@
package org.apache.tajo.rpc;
+import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-import com.google.protobuf.ServiceException;
-import org.apache.tajo.conf.TajoConf;
-
public abstract class ServerCallable<T> {
protected InetSocketAddress addr;
protected long startTime;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 7c8246a..61a92bc 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.junit.After;
@@ -65,7 +64,7 @@ public class TestAsyncRpc {
service, new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new AsyncRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
stub = client.getStub();
}
@@ -256,9 +255,9 @@ public class TestAsyncRpc {
client.close();
client = null;
- String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
+ String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new AsyncRpcClient(DummyProtocol.class,
- NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
Interface stub = client.getStub();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 28a3fad..746bfcb 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -18,14 +18,12 @@
package org.apache.tajo.rpc;
-import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.rpc.test.DummyProtocol;
import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.junit.After;
import org.junit.Before;
@@ -59,7 +57,7 @@ public class TestBlockingRpc {
new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new BlockingRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
stub = client.getStub();
}
@@ -96,7 +94,7 @@ public class TestBlockingRpc {
@Test
public void testRpcWithServiceCallable() throws Exception {
- RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2);
+ RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2);
final SumRequest request = SumRequest.newBuilder()
.setX1(1)
.setX2(2)
@@ -187,7 +185,7 @@ public class TestBlockingRpc {
try {
int port = server.getListenAddress().getPort() + 1;
new BlockingRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
fail("Connection should be failed.");
} catch (ConnectException ce) {
expected = true;
@@ -260,9 +258,9 @@ public class TestBlockingRpc {
client.close();
client = null;
- String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
+ String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new BlockingRpcClient(DummyProtocol.class,
- NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
BlockingInterface stub = client.getStub();
EchoMessage message = EchoMessage.newBuilder()