You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/16 12:49:59 UTC
git commit: TAJO-129: Enable the constructor of NettyServerBase to
take a service name. (hyunsik)
Updated Branches:
refs/heads/master f36dbe863 -> 6899815fc
TAJO-129: Enable the constructor of NettyServerBase to take a service name. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/6899815f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/6899815f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/6899815f
Branch: refs/heads/master
Commit: 6899815fc066793318787c43de960cc52017fcc4
Parents: f36dbe8
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Aug 15 15:23:32 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Aug 15 15:24:30 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tajo/catalog/CatalogServer.java | 8 +--
.../org/apache/tajo/catalog/TestCatalog.java | 1 -
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../java/org/apache/tajo/client/TajoClient.java | 27 ++++---
.../tajo/master/TajoMasterClientService.java | 10 +--
.../tajo/master/querymaster/QueryMaster.java | 11 ++-
.../querymaster/QueryMasterClientService.java | 9 +--
.../querymaster/QueryMasterManagerService.java | 14 ++--
.../org/apache/tajo/TajoTestingCluster.java | 7 +-
.../org/apache/tajo/rpc/NettyServerBase.java | 75 +++++++++++++++-----
.../apache/tajo/rpc/ProtoAsyncRpcClient.java | 2 +-
.../apache/tajo/rpc/ProtoAsyncRpcServer.java | 2 +-
.../apache/tajo/rpc/ProtoBlockingRpcClient.java | 2 +-
.../apache/tajo/rpc/ProtoBlockingRpcServer.java | 2 +-
.../java/org/apache/tajo/util/NetUtils.java | 40 ++++++++++-
.../org/apache/tajo/rpc/TestProtoAsyncRpc.java | 52 +++++---------
.../apache/tajo/rpc/TestProtoBlockingRpc.java | 39 +++-------
18 files changed, 174 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e84b29..45d1f17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-129: Enable the constructor of NettyServerBase to take a service
+ name. (hyunsik)
+
TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e82afc8..dd36c3e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -128,13 +128,11 @@ public class CatalogServer extends AbstractService {
// Creation of a HSA will force a resolve.
InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
try {
- this.rpcServer = new ProtoBlockingRpcServer(
- CatalogProtocol.class,
- handler, initIsa);
+ this.rpcServer = new ProtoBlockingRpcServer(CatalogProtocol.class, handler, initIsa);
this.rpcServer.start();
- this.bindAddress = this.rpcServer.getBindAddress();
- this.serverName = NetUtils.getIpPortString(bindAddress);
+ this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
+ this.serverName = NetUtils.normalizeInetSocketAddress(bindAddress);
conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
} catch (Exception e) {
LOG.error("Cannot start RPC Server of CatalogServer", e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 1ff7e61..6d08521 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -78,7 +78,6 @@ public class TestCatalog {
assertTrue(catalog.existsTable("getTable"));
TableDesc meta2 = catalog.getTableDesc("getTable");
- System.out.println(meta2);
catalog.deleteTable("getTable");
assertFalse(catalog.existsTable("getTable"));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index fb7c268..f4b20ff 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -67,6 +67,7 @@ public class TajoConf extends YarnConfiguration {
TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"), // used internally
CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
QUERY_MASTER_MANAGER_SERVICE_ADDRESS("tajo.master.querymastermanager.addr", "127.0.0.1:9005"),
+ QUERY_MASTER_CLIENT_SERVICE_ADDRESS("tajo.qmm.client.addr", "0.0.0.0:0"),
//////////////////////////////////
// Catalog Configuration
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 5b4b064..cd8706e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -22,7 +22,6 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogUtil;
@@ -31,14 +30,15 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.ResultSetImpl;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol.*;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.QueryMasterClientProtocol.*;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.rpc.ProtoBlockingRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -90,7 +90,7 @@ public class TajoClient {
}
LOG.info("connected to tajo cluster (" +
- org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+ org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(addr) + ")");
}
public void close() {
@@ -108,10 +108,11 @@ public class TajoClient {
try {
queryMasterConnectionMap.get(queryId).killQuery(null, queryId.getProto());
} catch (Exception e) {
- LOG.warn("Fail to close query:" + queryId + "," + e.getMessage(), e);
+ LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
}
queryMasterClientMap.get(queryId).close();
- LOG.info("Closed QueryMaster connection(" + queryId + "," + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
+ LOG.info("Closed a QueryMaster connection (qid=" + queryId + ", addr="
+ + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
queryMasterClientMap.remove(queryId);
queryMasterConnectionMap.remove(queryId);
}
@@ -169,7 +170,6 @@ public class TajoClient {
String queryMasterHost = res.getQueryMasterHost();
if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
- LOG.info("=========> connect to querymaster:" + queryMasterHost);
connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
}
}
@@ -185,8 +185,8 @@ public class TajoClient {
queryMasterConnectionMap.put(queryId, service);
queryMasterClientMap.put(queryId, client);
- LOG.debug("connected to Query Master (" +
- org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+ LOG.info("Connected to Query Master (qid=" + queryId + ", addr=" +
+ NetUtils.normalizeInetSocketAddress(addr) + ")");
} catch (Exception e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
@@ -221,8 +221,7 @@ public class TajoClient {
while(status != null && isQueryRunnning(status.getState())) {
try {
-// Thread.sleep(500);
- Thread.sleep(2000);
+ Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -238,7 +237,7 @@ public class TajoClient {
}
} else {
- LOG.warn("=====>Query failed:" + status.getState());
+ LOG.warn("Query " + status.getQueryId() + ") failed: " + status.getState());
//TODO throw SQLException(?)
return createNullResultSet(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 273a3c1..b433718 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.tajo.QueryId;
@@ -48,6 +47,7 @@ import org.apache.tajo.rpc.ProtoBlockingRpcServer;
import org.apache.tajo.rpc.RemoteException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -88,10 +88,9 @@ public class TajoMasterClientService extends AbstractService {
LOG.error(e);
}
server.start();
- bindAddress = server.getBindAddress();
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
- org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
- LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("TajoMasterClientService startup");
super.start();
}
@@ -100,6 +99,7 @@ public class TajoMasterClientService extends AbstractService {
if (server != null) {
server.shutdown();
}
+ LOG.info("TajoMasterClientService shutdown");
super.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 871ba77..4b15c61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
@@ -68,6 +67,7 @@ import org.apache.tajo.rpc.ProtoBlockingRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -270,14 +270,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = NetUtils.normalizeInetSocketAddress(this.bindAddr);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
- // Get the master address
- LOG.info(QueryMasterService.class.getSimpleName() + " is bind to " + addr);
queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+ LOG.info("QueryMasterService startup");
}
@Override
@@ -311,8 +310,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
if(clientSessionTimeoutCheckThread != null) {
clientSessionTimeoutCheckThread.interrupt();
}
- LOG.info("QueryMasterService stopped");
super.stop();
+ LOG.info("QueryMasterService stopped");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
index 1a326fe..74298e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.rpc.ProtoBlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import java.net.InetAddress;
@@ -67,14 +68,14 @@ public class QueryMasterClientService extends AbstractService {
this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = NetUtils.normalizeInetSocketAddress(bindAddr);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
- LOG.info(QueryMasterClientService.class.getSimpleName() + " is bind to " + addr);
- //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+ LOG.info(QueryMasterClientService.class.getSimpleName() + " (" + queryContext.getQueryId() + ") listens on "
+ + addr);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index a3c7b75..65f237c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -22,7 +22,6 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
@@ -32,9 +31,12 @@ import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.rpc.ProtoBlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class QueryMasterManagerService extends AbstractService {
private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
@@ -57,7 +59,7 @@ public class QueryMasterManagerService extends AbstractService {
@Override
public void start() {
// TODO resolve hostname
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
+ String confMasterServiceAddr = conf.getVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
try {
server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler, initIsa);
@@ -65,10 +67,9 @@ public class QueryMasterManagerService extends AbstractService {
LOG.error(e);
}
server.start();
- bindAddress = server.getBindAddress();
- this.conf.setVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS,
- org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
- LOG.info("Instantiated QueryMasterManagerService at " + this.bindAddress);
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("QueryMasterManagerService startup");
super.start();
}
@@ -78,6 +79,7 @@ public class QueryMasterManagerService extends AbstractService {
server.shutdown();
server = null;
}
+ LOG.info("QueryMasterManagerService shutdown");
super.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 695fcd6..30770cb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -39,7 +39,6 @@ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.util.NetUtils;
import java.io.*;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.ResultSet;
@@ -209,7 +208,7 @@ public class TajoTestingCluster {
catalogServer = new MiniCatalogServer(conf);
CatalogServer catServer = catalogServer.getCatalogServer();
InetSocketAddress sockAddr = catServer.getBindAddress();
- c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getIpPortString(sockAddr));
+ c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
return this.catalogServer;
}
@@ -352,10 +351,10 @@ public class TajoTestingCluster {
yarnCluster.start();
conf.set(YarnConfiguration.RM_ADDRESS,
- NetUtils.getIpPortString(yarnCluster.getResourceManager().
+ NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
getClientRMService().getBindAddress()));
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- NetUtils.getIpPortString(yarnCluster.getResourceManager().
+ NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
getApplicationMasterService().getBindAddress()));
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 9d63317..b520b3e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,6 +20,7 @@ package org.apache.tajo.rpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.NetUtils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -32,10 +33,14 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Random;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
public class NettyServerBase {
private static final Log LOG = LogFactory.getLog(NettyServerBase.class);
+ private static final String DEFAULT_PREFIX = "RpcServer_";
+ private static final AtomicInteger sequenceId = new AtomicInteger(0);
+ protected String serviceName;
protected InetSocketAddress serverAddr;
protected InetSocketAddress bindAddress;
protected ChannelFactory factory;
@@ -43,17 +48,19 @@ public class NettyServerBase {
protected ServerBootstrap bootstrap;
protected Channel channel;
- public NettyServerBase(InetSocketAddress addr) {
- if (addr.getPort() == 0) {
- try {
- int port = getUnusedPort();
- serverAddr = new InetSocketAddress(addr.getHostName(), port);
- } catch (IOException e) {
- LOG.error(e);
- }
- } else {
- serverAddr = addr;
- }
+ private InetSocketAddress initIsa;
+
+ public NettyServerBase(InetSocketAddress address) {
+ this.initIsa = address;
+ }
+
+ public NettyServerBase(String serviceName, InetSocketAddress addr) {
+ this.serviceName = serviceName;
+ this.initIsa = addr;
+ }
+
+ public void setName(String name) {
+ this.serviceName = name;
}
public void init(ChannelPipelineFactory pipeline) {
@@ -73,15 +80,30 @@ public class NettyServerBase {
bootstrap.setOption("child.receiveBufferSize", 1048576 * 2);
}
- public InetSocketAddress getBindAddress() {
+ public InetSocketAddress getListenAddress() {
return this.bindAddress;
}
public void start() {
+ if (serviceName == null) {
+ this.serviceName = getNextDefaultServiceName();
+ }
+
+ if (initIsa.getPort() == 0) {
+ try {
+ int port = getUnusedPort();
+ serverAddr = new InetSocketAddress(initIsa.getHostName(), port);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ } else {
+ serverAddr = initIsa;
+ }
+
this.channel = bootstrap.bind(serverAddr);
this.bindAddress = (InetSocketAddress) channel.getLocalAddress();
- LOG.info("RpcServer on " + this.bindAddress);
+ LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
}
public Channel getChannel() {
@@ -95,16 +117,33 @@ public class NettyServerBase {
if(factory != null) {
factory.releaseExternalResources();
}
- LOG.info("RpcServer (" + org.apache.tajo.util.NetUtils.getIpPortString(bindAddress)
- + ") shutdown");
+ LOG.info("Rpc (" + serviceName + ") listened on "
+ + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+ }
+
+ private static String getNextDefaultServiceName() {
+ return DEFAULT_PREFIX + sequenceId.getAndIncrement();
}
- private static final Random randomPort = new Random(System.currentTimeMillis());
+ private static final int startPortRange = 10000;
+ private static final int endPortRange = 50000;
+ private static final Random rnd = new Random(System.currentTimeMillis());
+ // each system has a different starting port number within the given range.
+ private static final AtomicInteger nextPortNum =
+ new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+
+
private synchronized static int getUnusedPort() throws IOException {
while (true) {
- int port = randomPort.nextInt(10000) + 50000;
+ int port = nextPortNum.getAndIncrement();
+ if (port >= endPortRange) {
+ synchronized (nextPortNum) {
+ nextPortNum.set(startPortRange);
+ port = nextPortNum.getAndIncrement();
+ }
+ }
if (available(port)) {
- LOG.info("Found unused port:" + port);
+ LOG.info("Detect an unused port:" + port);
return port;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
index 6afbdf5..c58db58 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
@@ -165,7 +165,7 @@ public class ProtoAsyncRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
return "Exception [" + protocol.getCanonicalName() +
- "(" + NetUtils.getIpPortString((InetSocketAddress)
+ "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
getChannel().getRemoteAddress()) + ")]: " + message;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
index 46ad761..56faaba 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
@@ -42,7 +42,7 @@ public class ProtoAsyncRpcServer extends NettyServerBase {
final Object instance,
final InetSocketAddress bindAddress)
throws Exception {
- super(bindAddress);
+ super(protocol.getSimpleName(), bindAddress);
String serviceClassName = protocol.getName() + "$" +
protocol.getSimpleName() + "Service";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
index 8b61ce4..2018f6d 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
@@ -130,7 +130,7 @@ public class ProtoBlockingRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
return "Exception [" + protocol.getCanonicalName() +
- "(" + NetUtils.getIpPortString((InetSocketAddress)
+ "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
getChannel().getRemoteAddress()) + ")]: " + message;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
index d4ea8d4..cc47b7b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
@@ -41,7 +41,7 @@ public class ProtoBlockingRpcServer extends NettyServerBase {
final InetSocketAddress bindAddress)
throws Exception {
- super(bindAddress);
+ super(protocol.getSimpleName(), bindAddress);
String serviceClassName = protocol.getName() + "$" +
protocol.getSimpleName() + "Service";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
index 1c2b13c..f64fa59 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -18,10 +18,12 @@
package org.apache.tajo.util;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
public class NetUtils {
- public static String getIpPortString(InetSocketAddress addr) {
+ public static String normalizeInetSocketAddress(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
@@ -29,4 +31,40 @@ public class NetUtils {
String [] splitted = addr.split(":");
return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
}
+
+ /**
+ * Util method to build socket addr from either:
+ * <host>
+ * <host>:<port>
+ * <fs>://<host>:<port>/<path>
+ */
+ public static InetSocketAddress createSocketAddr(String host, int port) {
+ return new InetSocketAddress(host, port);
+ }
+
+ public static InetSocketAddress createUnresolved(String addr) {
+ String [] splitted = addr.split(":");
+ return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ /**
+ * Returns InetSocketAddress that a client can use to
+ * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+ * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+ * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+ *
+ * @param addr of a listener
+ * @return socket address that a client can use to connect to the server.
+ */
+ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+ if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+ try {
+ addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+ } catch (UnknownHostException uhe) {
+ // shouldn't get here unless the host doesn't have a loopback iface
+ addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+ }
+ }
+ return addr;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
index cc371c4..69d68b8 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
@@ -19,18 +19,17 @@
package org.apache.tajo.rpc;
import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.rpc.test.DummyProtocol;
import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
@@ -57,16 +56,21 @@ public class TestProtoAsyncRpc {
server = new ProtoAsyncRpcServer(DummyProtocol.class,
service, new InetSocketAddress(0));
server.start();
- client = new ProtoAsyncRpcClient(DummyProtocol.class, server.getBindAddress());
+ client = new ProtoAsyncRpcClient(DummyProtocol.class, server.getListenAddress());
stub = client.getStub();
}
@After
public void tearDown() throws Exception {
- client.close();
- server.shutdown();
+ if(client != null) {
+ client.close();
+ }
+ if(server != null) {
+ server.shutdown();
+ }
}
+ boolean calledMarker = false;
@Test
public void testRpc() throws Exception {
@@ -87,14 +91,17 @@ public class TestProtoAsyncRpc {
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
-
- stub.echo(null, echoMessage, new RpcCallback<EchoMessage>() {
+ RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
@Override
public void run(EchoMessage parameter) {
echo = parameter.getMessage();
assertEquals(MESSAGE, echo);
+ calledMarker = true;
}
- });
+ };
+ stub.echo(null, echoMessage, callback);
+ Thread.sleep(1000);
+ assertTrue(calledMarker);
}
private CountDownLatch testNullLatch;
@@ -114,31 +121,6 @@ public class TestProtoAsyncRpc {
assertTrue(service.getNullCalled);
}
- private CountDownLatch testGetErrorLatch;
-
- //@Test
- // TODO - to be fixed
- public void testGetError() throws Exception {
- EchoMessage echoMessage2 = EchoMessage.newBuilder()
- .setMessage("[Don't Worry! It's an exception message for unit test]").
- build();
-
- testGetErrorLatch = new CountDownLatch(1);
- RpcController controller = new NettyRpcController();
- stub.getError(controller, echoMessage2, new RpcCallback<EchoMessage>() {
- @Override
- public void run(EchoMessage parameter) {
- assertNull(parameter);
- LOG.info("testGetError retrieved");
- testGetErrorLatch.countDown();
- }
- });
- testGetErrorLatch.await(1000, TimeUnit.MILLISECONDS);
- assertTrue(service.getErrorCalled);
- assertTrue(controller.failed());
- assertEquals(echoMessage2.getMessage(), controller.errorText());
- }
-
@Test
public void testCallFuture() throws Exception {
EchoMessage echoMessage = EchoMessage.newBuilder()
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
index eeecc3d..2defc96 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
@@ -19,9 +19,7 @@
package org.apache.tajo.rpc;
import com.google.protobuf.RpcController;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import org.apache.tajo.rpc.test.DummyProtocol;
import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
@@ -38,13 +36,13 @@ import static org.junit.Assert.*;
public class TestProtoBlockingRpc {
public static String MESSAGE = "TestProtoBlockingRpc";
- private static ProtoBlockingRpcServer server;
- private static ProtoBlockingRpcClient client;
- private static BlockingInterface stub;
- private static DummyProtocolBlockingImpl service;
+ private ProtoBlockingRpcServer server;
+ private ProtoBlockingRpcClient client;
+ private BlockingInterface stub;
+ private DummyProtocolBlockingImpl service;
- @BeforeClass
- public static void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
service = new DummyProtocolBlockingImpl();
server = new ProtoBlockingRpcServer(DummyProtocol.class, service,
new InetSocketAddress(10000));
@@ -55,8 +53,8 @@ public class TestProtoBlockingRpc {
stub = client.getStub();
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
if(client != null) {
client.close();
}
@@ -97,8 +95,6 @@ public class TestProtoBlockingRpc {
.setMessage(MESSAGE)
.build();
stub.deley(null, message);
-// client.close();
-// client = null;
} catch (Exception e) {
e.printStackTrace();
error.append(e.getMessage());
@@ -129,29 +125,16 @@ public class TestProtoBlockingRpc {
};
shutdownThread.start();
- latch.await(10 * 1000, TimeUnit.MILLISECONDS);
+ latch.await(5 * 1000, TimeUnit.MILLISECONDS);
assertTrue(latch.getCount() == 0);
synchronized(error) {
- error.wait(10 * 1000);
+ error.wait(5 * 1000);
}
if(!error.toString().isEmpty()) {
fail(error.toString());
}
}
-
- //@Test
- public void testGetError() throws Exception {
- EchoMessage echoMessage2 = EchoMessage.newBuilder()
- .setMessage("[Don't Worry! It's an exception message for unit test]").
- build();
-
- RpcController controller = new NettyRpcController();
- assertNull(stub.getError(controller, echoMessage2));
- assertTrue(service.getErrorCalled);
- assertTrue(controller.failed());
- assertEquals(echoMessage2.getMessage(), controller.errorText());
- }
}
\ No newline at end of file