You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/04/28 11:38:59 UTC
tajo git commit: TAJO-2116: Simplify rpc address in default
configuration.
Repository: tajo
Updated Branches:
refs/heads/master aad78a4e7 -> 16cbf69fe
TAJO-2116: Simplify rpc address in default configuration.
Closes #997
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/16cbf69f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/16cbf69f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/16cbf69f
Branch: refs/heads/master
Commit: 16cbf69fe363de82e17bbb26f39ce7efa23d6546
Parents: aad78a4
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Apr 28 18:37:58 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Apr 28 18:37:58 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/catalog/CatalogClient.java | 14 ++----
.../org/apache/tajo/catalog/CatalogServer.java | 15 +++---
.../org/apache/tajo/cli/tools/TajoAdmin.java | 21 +++-----
.../org/apache/tajo/cli/tools/TajoDump.java | 17 ++++---
.../org/apache/tajo/cli/tools/TajoGetConf.java | 20 ++++----
.../org/apache/tajo/cli/tools/TajoHAAdmin.java | 20 ++++----
.../java/org/apache/tajo/cli/tsql/TajoCli.java | 24 ++++-----
.../org/apache/tajo/client/TajoClientImpl.java | 3 --
.../org/apache/tajo/TajoTestingCluster.java | 6 +--
.../java/org/apache/tajo/conf/TajoConf.java | 52 +++++++++++++++-----
.../apache/tajo/rule/base/BaseRuleProvider.java | 12 ++---
.../base/CheckHadoopRuntimeVersionRule.java | 18 +++----
.../apache/tajo/service/BaseServiceTracker.java | 17 ++++---
.../java/org/apache/tajo/util/NetUtils.java | 14 +++++-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 8 +--
.../java/org/apache/tajo/benchmark/TPCH.java | 3 --
.../tajo/engine/function/builtin/Date.java | 4 +-
.../org/apache/tajo/ha/HdfsServiceTracker.java | 10 ++--
.../tajo/master/QueryCoordinatorService.java | 3 +-
.../java/org/apache/tajo/master/TajoMaster.java | 3 +-
.../tajo/master/TajoMasterClientService.java | 5 +-
.../NonForwardQueryResultSystemScanner.java | 2 +-
.../tajo/master/rm/TajoResourceTracker.java | 7 ++-
.../tajo/master/rule/MasterRuleProvider.java | 12 ++---
.../querymaster/QueryMasterManagerService.java | 20 +++-----
.../main/java/org/apache/tajo/util/JSPUtil.java | 10 ++--
.../apache/tajo/webapp/StaticHttpServer.java | 7 +--
.../java/org/apache/tajo/worker/TajoWorker.java | 37 +++++---------
.../tajo/worker/TajoWorkerClientService.java | 37 +++++---------
.../tajo/worker/TajoWorkerManagerService.java | 11 ++---
.../tajo/worker/rule/WorkerRuleProvider.java | 12 ++---
.../org/apache/tajo/ws/rs/TajoRestService.java | 2 +-
tajo-dist/src/main/conf/tajo-site.xml.template | 4 +-
.../main/sphinx/configuration/cluster_setup.rst | 15 ++----
.../configuration/service_config_defaults.rst | 32 ++++++------
.../rewrite/rules/ProjectionPushDownRule.java | 3 +-
.../retriever/AdvancedDataRetriever.java | 2 +-
.../org/apache/tajo/storage/FileTablespace.java | 2 +-
.../storage/thirdparty/orc/OrcRecordReader.java | 2 +-
.../org/apache/tajo/HttpFileServerHandler.java | 2 +-
41 files changed, 240 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8351ee4..60d844f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -12,6 +12,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-2116: Simplify rpc address in default configuration. (jinho)
+
TAJO-2126: Allow parallel execution of non-leaf sibling ExecutionBlocks.
(jihooon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 2b24a6b..287688b 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -22,10 +22,11 @@ import com.google.protobuf.ServiceException;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
-import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -37,14 +38,13 @@ import java.util.Properties;
public class CatalogClient extends AbstractCatalogClient {
protected NettyClientBase client;
protected ServiceTracker serviceTracker;
- protected InetSocketAddress catalogServerAddr;
+
/**
* @throws java.io.IOException
*
*/
public CatalogClient(final TajoConf conf) throws IOException {
super(conf);
- this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS));
this.serviceTracker = ServiceTrackerFactory.get(conf);
}
@@ -55,11 +55,7 @@ public class CatalogClient extends AbstractCatalogClient {
}
private InetSocketAddress getCatalogServerAddr() {
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return catalogServerAddr;
- } else {
- return serviceTracker.getCatalogAddress();
- }
+ return serviceTracker.getCatalogAddress();
}
public synchronized NettyClientBase getCatalogConnection() throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index d99daee..df40b9b 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -92,7 +92,6 @@ public class CatalogServer extends AbstractService {
// RPC variables
private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddress;
- private String bindAddressStr;
final CatalogProtocolHandler handler;
private Collection<FunctionDesc> builtinFuncs;
@@ -158,28 +157,26 @@ public class CatalogServer extends AbstractService {
@Override
public void serviceStart() throws Exception {
- String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
+ InetSocketAddress initIsa = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM);
try {
this.rpcServer = new BlockingRpcServer(CatalogProtocol.class, handler, initIsa, workerNum);
this.rpcServer.start();
this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
- this.bindAddressStr = NetUtils.normalizeInetSocketAddress(bindAddress);
- conf.setVar(ConfVars.CATALOG_ADDRESS, bindAddressStr);
+ conf.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getHostPortString(bindAddress));
} catch (Exception e) {
LOG.error("CatalogServer startup failed", e);
throw new TajoInternalError(e);
}
- LOG.info("Catalog Server startup (" + bindAddressStr + ")");
+ LOG.info("Catalog Server startup (" + bindAddress + ")");
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
- LOG.info("Catalog Server (" + bindAddressStr + ") shutdown");
+ LOG.info("Catalog Server (" + bindAddress + ") shutdown");
// If CatalogServer shutdowns before it started, rpcServer and store may be NULL.
// So, we should check Nullity of them.
@@ -686,7 +683,7 @@ public class CatalogServer extends AbstractService {
try {
store.createTable(request);
LOG.info(String.format("relation \"%s\" is added to the catalog (%s)",
- CatalogUtil.getCanonicalTableName(dbName, tbName), bindAddressStr));
+ CatalogUtil.getCanonicalTableName(dbName, tbName), bindAddress));
return OK;
} catch (Throwable t) {
@@ -716,7 +713,7 @@ public class CatalogServer extends AbstractService {
try {
store.dropTable(dbName, tbName);
LOG.info(String.format("relation \"%s\" is deleted from the catalog (%s)",
- CatalogUtil.getCanonicalTableName(dbName, tbName), bindAddressStr));
+ CatalogUtil.getCanonicalTableName(dbName, tbName), bindAddress));
return OK;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 84e4ef5..b5bbdf0 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -124,19 +124,15 @@ public class TajoAdmin {
}
// if there is no "-h" option,
+ InetSocketAddress address = tajoConf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
if(hostName == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- hostName = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
- }
+ hostName = address.getHostName();
}
+
if (port == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
- }
+ port = address.getPort();
}
if (cmdType == 0) {
@@ -149,7 +145,7 @@ public class TajoAdmin {
System.err.println("ERROR: cannot find valid Tajo server address");
return;
} else if (hostName != null && port != null) {
- tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
+ tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(hostName, port));
tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf));
} else if (hostName == null && port == null) {
tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf));
@@ -432,8 +428,7 @@ public class TajoAdmin {
}
writer.write("\n");
} else {
- String confMasterServiceAddr = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
+ InetSocketAddress masterAddress = tajoConf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
writer.write(masterAddress.getHostName());
writer.write("\n");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
index c9fa2b4..f2df0cd 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
@@ -27,10 +27,12 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -64,16 +66,17 @@ public class TajoDump {
port = Integer.parseInt(cmd.getOptionValue("p"));
}
+ InetSocketAddress address = conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
if(hostName == null) {
- if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- hostName = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
- }
+ hostName = address.getHostName();
}
+
if (port == null) {
- if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
- }
+ port = address.getPort();
}
+
return new Pair<>(hostName, port);
}
@@ -104,7 +107,7 @@ public class TajoDump {
System.err.println("ERROR: cannot find any TajoMaster rpc address in arguments and tajo-site.xml.");
System.exit(-1);
} else if (hostName != null && port != null) {
- conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
+ conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(hostName, port));
client = new TajoClientImpl(ServiceTrackerFactory.get(conf));
} else {
client = new TajoClientImpl(ServiceTrackerFactory.get(conf));
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java
index 5136775..de66e20 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java
@@ -22,10 +22,12 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
+import java.net.InetSocketAddress;
import java.sql.SQLException;
public class TajoGetConf {
@@ -92,25 +94,21 @@ public class TajoGetConf {
}
// if there is no "-h" option,
+ InetSocketAddress address = tajoConf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
if(hostName == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- hostName = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
- }
+ hostName = address.getHostName();
}
+
if (port == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
- }
+ port = address.getPort();
}
if ((hostName == null) ^ (port == null)) {
return;
} else if (hostName != null && port != null) {
- tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
+ tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(hostName, port));
}
processConfKey(writer, param);
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
index 5ada997..e28f679 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
@@ -24,10 +24,12 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
+import java.net.InetSocketAddress;
public class TajoHAAdmin {
private static final Options options;
@@ -100,19 +102,15 @@ public class TajoHAAdmin {
}
// if there is no "-h" option,
+ InetSocketAddress address = tajoConf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
if(hostName == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- hostName = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
- }
+ hostName = address.getHostName();
}
+
if (port == null) {
- if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
- }
+ port = address.getPort();
}
if (cmdType == 0) {
@@ -125,7 +123,7 @@ public class TajoHAAdmin {
System.err.println("ERROR: cannot find valid Tajo server address");
return;
} else if (hostName != null && port != null) {
- tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
+ tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(hostName, port));
}
if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 489cd3d..e6e8410 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -45,13 +45,11 @@ import org.apache.tajo.exception.TajoException;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.parser.sql.SQLLexer;
import org.apache.tajo.service.ServiceTrackerFactory;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.ShutdownHookManager;
-import org.apache.tajo.util.VersionInfo;
+import org.apache.tajo.util.*;
import java.io.*;
import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
@@ -239,19 +237,15 @@ public class TajoCli implements Closeable {
this.reconnect = cmd.hasOption("reconnect");
// if there is no "-h" option,
+ InetSocketAddress address = conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
if(hostName == null) {
- if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- hostName = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
- }
+ hostName = address.getHostName();
}
+
if (port == null) {
- if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
- // it checks if the client service address is given in configuration and distributed mode.
- // if so, it sets entryAddr.
- port = Integer.parseInt(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
- }
+ port = address.getPort();
}
// Get connection parameters
@@ -262,7 +256,7 @@ public class TajoCli implements Closeable {
System.err.println(ERROR_PREFIX + "cannot find valid Tajo server address");
throw new RuntimeException("cannot find valid Tajo server address");
} else if (hostName != null && port != null) {
- conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
+ conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(hostName, port));
client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams);
} else if (hostName == null && port == null) {
client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams);
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 3561c87..932db0e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -18,8 +18,6 @@
package org.apache.tajo.client;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.annotation.ThreadSafe;
@@ -46,7 +44,6 @@ import java.util.concurrent.Future;
@ThreadSafe
public class TajoClientImpl extends SessionConnection implements TajoClient, QueryClient, CatalogAdminClient {
- private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
QueryClient queryClient;
CatalogAdminClient catalogClient;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 5413905..a83884a 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -49,6 +49,7 @@ import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
@@ -382,14 +383,13 @@ public class TajoTestingCluster {
InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
- this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
- tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+ this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, NetUtils.getHostPortString(tajoMasterAddress));
this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
InetSocketAddress tajoRestAddress = tajoMaster.getContext().getRestServer().getBindAddress();
- this.conf.setVar(ConfVars.REST_SERVICE_ADDRESS, tajoRestAddress.getHostName() + ":" + tajoRestAddress.getPort());
+ this.conf.setVar(ConfVars.REST_SERVICE_ADDRESS, NetUtils.getHostPortString(tajoRestAddress));
startTajoWorkers(numSlaves);
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index e619b24..28853e3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -28,6 +28,7 @@ import org.apache.tajo.ConfigKey;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.service.BaseServiceTracker;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.NetUtils;
@@ -133,13 +134,19 @@ public class TajoConf extends Configuration {
// Tajo Master Service Addresses
TAJO_MASTER_UMBILICAL_RPC_ADDRESS("tajo.master.umbilical-rpc.address", "localhost:26001",
Validators.networkAddr()),
- TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002",
- Validators.networkAddr()),
+ TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002", Validators.networkAddr()),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
+ // Resource tracker service
+ RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "0.0.0.0:26003", Validators.networkAddr()),
+ RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120), // seconds
+
// Tajo Rest Service
REST_SERVICE_ADDRESS("tajo.rest.service.address", "0.0.0.0:26880", Validators.networkAddr()),
+ // Catalog
+ CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "0.0.0.0:26005", Validators.networkAddr()),
+
// High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
@@ -151,17 +158,11 @@ public class TajoConf extends Configuration {
HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
// Async IO Task Service
-
/** The number of threads for async tasks */
MASTER_ASYNC_TASK_THREAD_NUM("tajo.master.async-task.thread-num", 4),
/** How long it will wait for termination */
MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME("tajo.master.async-task.wait-time-sec", 60), // 1 min
- // Resource tracker service
- RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
- Validators.networkAddr()),
- RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120), // seconds
-
// QueryMaster resource
QUERYMASTER_MINIMUM_MEMORY("tajo.qm.resource.min.memory-mb", 500, Validators.min("64")),
@@ -181,7 +182,7 @@ public class TajoConf extends Configuration {
// Tajo Worker Resources
WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores",
- Runtime.getRuntime().availableProcessors(), Validators.min("2")), // 1qm + 1task
+ Math.max(Runtime.getRuntime().availableProcessors(), 2), Validators.min("2")), // 1qm + 1task container
WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1500, Validators.min("64")),
WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
@@ -199,9 +200,6 @@ public class TajoConf extends Configuration {
QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM("tajo.qm.task-scheduler.request.max-num", 50),
- // Catalog
- CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()),
-
// Query Configuration
QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")),
@@ -741,6 +739,26 @@ public class TajoConf extends Configuration {
return NetUtils.createSocketAddr(address);
}
+ /**
+ * Returns InetSocketAddress that a client can use to connect to the server.
+ * If the configured address is any local address(”0.0.0.0”), finds default host in defaultVar.
+ *
+ * @param var
+ * @param defaultVar
+ * @return InetSocketAddress
+ */
+ public InetSocketAddress getSocketAddrVar(ConfVars var, ConfVars defaultVar) {
+
+ InetSocketAddress addr = NetUtils.createSocketAddr(getVar(var));
+
+ if (addr.getAddress().isAnyLocalAddress()) {
+ InetSocketAddress defaultAddr = NetUtils.createSocketAddr(getVar(defaultVar));
+ addr = NetUtils.createSocketAddr(defaultAddr.getHostName(), addr.getPort());
+ }
+
+ return addr;
+ }
+
/////////////////////////////////////////////////////////////////////////////
// Tajo System Specific Methods
/////////////////////////////////////////////////////////////////////////////
@@ -749,7 +767,15 @@ public class TajoConf extends Configuration {
String rootPath = conf.getVar(ConfVars.ROOT_DIR);
Preconditions.checkNotNull(rootPath,
ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
- return new Path(rootPath);
+
+ FileSystem fs;
+
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new TajoInternalError(e);
+ }
+ return fs.makeQualified(new Path(rootPath));
}
public static Path getWarehouseDir(TajoConf conf) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java b/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java
index 7be2eeb..c9f3bbb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java
+++ b/tajo-common/src/main/java/org/apache/tajo/rule/base/BaseRuleProvider.java
@@ -18,19 +18,19 @@
package org.apache.tajo.rule.base;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.rule.SelfDiagnosisRule;
+import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.util.ClassUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
public class BaseRuleProvider implements SelfDiagnosisRuleProvider {
- private Log LOG = LogFactory.getLog(getClass());
+ private static final Log LOG = LogFactory.getLog(BaseRuleProvider.class);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java b/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java
index bfd9707..93faa7f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java
+++ b/tajo-common/src/main/java/org/apache/tajo/rule/base/CheckHadoopRuntimeVersionRule.java
@@ -18,26 +18,22 @@
package org.apache.tajo.rule.base;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
-import org.apache.tajo.rule.SelfDiagnosisRule;
+import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
import org.apache.tajo.validation.Validators;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
@SelfDiagnosisRuleDefinition(category="base", name = "CheckHadoopRuntimeVersionRule", priority = 0)
@SelfDiagnosisRuleVisibility.Public
public class CheckHadoopRuntimeVersionRule implements SelfDiagnosisRule {
-
- private Log LOG = LogFactory.getLog(getClass());
+
+ private static final Log LOG = LogFactory.getLog(CheckHadoopRuntimeVersionRule.class);
private final Properties versionInfo;
public CheckHadoopRuntimeVersionRule() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
index 43f8c32..4d61fa4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -19,7 +19,7 @@
package org.apache.tajo.service;
import org.apache.tajo.conf.TajoConf;
-
+import org.apache.tajo.conf.TajoConf.ConfVars;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -38,11 +38,16 @@ public class BaseServiceTracker implements ServiceTracker {
tajoMasterInfo = new TajoMasterInfo();
tajoMasterInfo.setActive(true);
tajoMasterInfo.setAvailable(true);
- tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
- tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
- tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
- tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS));
- tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS));
+ tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+
+ tajoMasterInfo.setTajoClientAddress(
+ conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(
+ conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ tajoMasterInfo.setCatalogAddress(
+ conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ tajoMasterInfo.setWebServerAddress(
+ conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
tajoMasterInfos = Arrays.asList(tajoMasterInfo);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
index 829829f..c821d11 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -21,8 +21,20 @@ package org.apache.tajo.util;
import java.net.*;
public class NetUtils {
+
+ /**
+ * Compose a "host:port" string from the address.
+ */
+ public static String getHostPortString(InetSocketAddress addr) {
+ return getHostPortString(addr.getHostName(), addr.getPort());
+ }
+
+ public static String getHostPortString(String host, int port) {
+ return host + ":" + port;
+ }
+
public static String normalizeInetSocketAddress(InetSocketAddress addr) {
- return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ return getHostPortString(addr.getAddress().getHostAddress(), addr.getPort());
}
public static InetSocketAddress createSocketAddr(String addr) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index c148298..4e75aa0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -47,6 +47,7 @@ import org.junit.Test;
import org.junit.rules.TestName;
import java.io.*;
+import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Map;
import java.util.Properties;
@@ -387,10 +388,11 @@ public class TestTajoCli {
String consoleResult = new String(out.toByteArray());
- String masterAddress = tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- String host = masterAddress.split(":")[0];
+ InetSocketAddress masterAddress =
+ tajoCli.getContext().getConf().getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+
tajoCli.close();
- assertEquals(consoleResult, host + "\n");
+ assertEquals(consoleResult, masterAddress.getHostName() + "\n");
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 87e1b54..d7ff0d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -19,8 +19,6 @@
package org.apache.tajo.benchmark;
import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.catalog.CatalogUtil;
@@ -38,7 +36,6 @@ import java.io.IOException;
import java.util.Map;
public class TPCH extends BenchmarkSet {
- private final Log LOG = LogFactory.getLog(TPCH.class);
private final String BENCHMARK_DIR = "benchmark/tpch";
public static final String LINEITEM = "lineitem";
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Date.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Date.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Date.java
index 05928d6..28f9b41 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Date.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Date.java
@@ -42,7 +42,7 @@ import static org.apache.tajo.common.TajoDataTypes.Type.TEXT;
paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT4})}
)
public class Date extends GeneralFunction {
- private final Log LOG = LogFactory.getLog(Date.class);
+ private static final Log LOG = LogFactory.getLog(Date.class);
private final static String dateFormat = "dd/MM/yyyy HH:mm:ss";
public Date() {
@@ -55,7 +55,7 @@ public class Date extends GeneralFunction {
return DatumFactory.createInt8(new SimpleDateFormat(dateFormat)
.parse(params.getText(0)).getTime());
} catch (ParseException e) {
- LOG.error(e);
+ LOG.warn(e);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
index 120d61c..b0fc4e6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -256,16 +256,18 @@ public class HdfsServiceTracker extends HAServiceTracker {
address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+ address = conf.getSocketAddrVar(
+ ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+ address = conf.getSocketAddrVar(
+ ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
case HAConstants.CATALOG_ADDRESS:
- address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
+ address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
case HAConstants.MASTER_INFO_ADDRESS:
- address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
default:
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
index e209538..30c46d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -58,8 +58,7 @@ public class QueryCoordinatorService extends AbstractService {
@Override
public void serviceStart() throws Exception {
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ InetSocketAddress initIsa = conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 91fba51..65fa377 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -315,8 +315,7 @@ public class TajoMaster extends CompositeService {
initSystemMetrics();
// Setting the system global configs
- systemConf.setSocketAddr(ConfVars.CATALOG_ADDRESS.varname,
- NetUtils.getConnectAddress(catalogServer.getBindAddress()));
+ systemConf.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getHostPortString(catalogServer.getBindAddress()));
try {
writeSystemConf();
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 205329b..22024e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -92,14 +92,13 @@ public class TajoMasterClientService extends AbstractService {
public void serviceStart() throws Exception {
// start the rpc server
- String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
+ InetSocketAddress initIsa = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(bindAddress));
super.serviceStart();
LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index c7b9754..c3280fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -71,7 +71,7 @@ import java.util.Stack;
public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
- private final Log LOG = LogFactory.getLog(getClass());
+ private static final Log LOG = LogFactory.getLog(NonForwardQueryResultSystemScanner.class);
private MasterContext masterContext;
private LogicalPlan logicalPlan;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 3e3e3b4..9ac54a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -56,7 +56,7 @@ import static org.apache.tajo.ResourceProtos.*;
*/
public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface {
/** Class logger */
- private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
+ private static final Log LOG = LogFactory.getLog(TajoResourceTracker.class);
private final TajoResourceManager manager;
/** the context of TajoResourceManager */
@@ -85,8 +85,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
TajoConf systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
activeInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_ACTIVE_INTERVAL);
- String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ InetSocketAddress initIsa = systemConf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
int workerNum = systemConf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, workerNum);
@@ -94,7 +93,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
// Set actual bind address to the systemConf
- systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.getHostPortString(bindAddress));
LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
super.serviceInit(conf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java
index 1a609cf..671cd47 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rule/MasterRuleProvider.java
@@ -18,19 +18,19 @@
package org.apache.tajo.master.rule;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.rule.SelfDiagnosisRule;
+import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.util.ClassUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
public class MasterRuleProvider implements SelfDiagnosisRuleProvider {
- private Log LOG = LogFactory.getLog(getClass());
+ private static final Log LOG = LogFactory.getLog(MasterRuleProvider.class);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index c911fbc..36ae43f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -52,17 +52,14 @@ public class QueryMasterManagerService extends CompositeService
private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private String addr;
- private int port;
private QueryMaster queryMaster;
private TajoWorker.WorkerContext workerContext;
- public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
+ public QueryMasterManagerService(TajoWorker.WorkerContext workerContext) {
super(QueryMasterManagerService.class.getName());
this.workerContext = workerContext;
- this.port = port;
}
public QueryMaster getQueryMaster() {
@@ -74,8 +71,7 @@ public class QueryMasterManagerService extends CompositeService
TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
// Setup RPC server
- InetSocketAddress initIsa =
- new InetSocketAddress("0.0.0.0", port);
+ InetSocketAddress initIsa = tajoConf.getSocketAddrVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS);
if (initIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
@@ -85,17 +81,13 @@ public class QueryMasterManagerService extends CompositeService
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
- this.port = bindAddr.getPort();
-
- queryMaster = new QueryMaster(workerContext);
+ this.queryMaster = new QueryMaster(workerContext);
addService(queryMaster);
// Get the master address
- LOG.info("QueryMasterManagerService is bind to " + addr);
- tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
-
- super.serviceInit(conf);
+ LOG.info("QueryMasterManagerService is bind to " + bindAddr);
+ tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, NetUtils.getHostPortString(bindAddr));
+ super.serviceInit(tajoConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index f7928ea..671a365 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -72,13 +72,11 @@ public class JSPUtil {
}
public static String getTajoMasterHttpAddr(Configuration config) {
- if (!(config instanceof TajoConf)) {
- throw new IllegalArgumentException("config should be a TajoConf type.");
- }
+
try {
- TajoConf conf = (TajoConf) config;
- String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":");
- return masterAddr[0] + ":" + conf.getVar(ConfVars.TAJO_MASTER_INFO_ADDRESS).split(":")[1];
+ TajoConf conf = TUtil.checkTypeAndGet(config, TajoConf.class);
+ return NetUtils.getHostPortString(conf.getSocketAddrVar(
+ ConfVars.TAJO_MASTER_INFO_ADDRESS, ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
index 6008aae..c20727e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -26,7 +26,6 @@ import org.apache.tajo.worker.TajoWorker;
import org.mortbay.jetty.Connector;
import java.io.IOException;
-import java.net.Inet4Address;
public class StaticHttpServer extends HttpServer {
private static StaticHttpServer instance = null;
@@ -49,9 +48,11 @@ public class StaticHttpServer extends HttpServer {
if(instance == null) {
if(bindAddress == null || bindAddress.compareTo("") == 0) {
if (containerObject instanceof TajoMaster) {
- addr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":")[0];
+ addr = conf.getSocketAddrVar(
+ ConfVars.TAJO_MASTER_INFO_ADDRESS).getHostName();
} else if (containerObject instanceof TajoWorker) {
- addr = Inet4Address.getLocalHost().getHostName();
+ addr = conf.getSocketAddrVar(
+ ConfVars.WORKER_INFO_ADDRESS).getHostName();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 6296bb0..faab446 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -49,7 +49,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
-import org.apache.tajo.service.TajoMasterInfo;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.OldStorageManager;
import org.apache.tajo.util.*;
@@ -64,6 +63,7 @@ import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -127,21 +127,17 @@ public class TajoWorker extends CompositeService {
this.workerContext = new TajoWorkerContext();
this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
- int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
- int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
- int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
-
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
- tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext);
addIfService(tajoWorkerManagerService);
// querymaster worker
- tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+ tajoWorkerClientService = new TajoWorkerClientService(workerContext);
addIfService(tajoWorkerClientService);
- queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
+ queryMasterManagerService = new QueryMasterManagerService(workerContext);
addIfService(queryMasterManagerService);
if (!PullServerUtil.useExternalPullServerService(systemConf)) {
@@ -220,17 +216,20 @@ public class TajoWorker extends CompositeService {
}
private int initWebServer() {
- int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+ InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS);
try {
- webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort,
+ webServer = StaticHttpServer.getInstance(this, "worker", address.getHostName(), address.getPort(),
true, null, systemConf, null);
webServer.start();
- httpPort = webServer.getPort();
- LOG.info("Worker info server started:" + httpPort);
+
+ systemConf.setVar(TajoConf.ConfVars.WORKER_INFO_ADDRESS, NetUtils.getHostPortString(
+ NetUtils.getConnectAddress(new InetSocketAddress(address.getHostName(), webServer.getPort()))));
+
+ LOG.info("Worker info server started:" + webServer.getPort());
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
- return httpPort;
+ return webServer.getPort();
}
private void initCleanupService() throws IOException {
@@ -263,18 +262,6 @@ public class TajoWorker extends CompositeService {
@Override
public void serviceStart() throws Exception {
startJvmPauseMonitor();
-
- TajoMasterInfo tajoMasterInfo = new TajoMasterInfo();
-
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
- tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress());
- } else {
- tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
- tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS)));
- }
connectToCatalog();
if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 26b0d08..635fce0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -18,7 +18,6 @@
package org.apache.tajo.worker;
-import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
@@ -32,75 +31,63 @@ import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.rpc.BlockingRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.QueryHistory;
import java.net.InetSocketAddress;
+@Deprecated
public class TajoWorkerClientService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
- private final PrimitiveProtos.BoolProto BOOL_TRUE =
- PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
- private final PrimitiveProtos.BoolProto BOOL_FALSE =
- PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private int port;
- private TajoConf conf;
private TajoWorker.WorkerContext workerContext;
private TajoWorkerClientProtocolServiceHandler serviceHandler;
- public TajoWorkerClientService(TajoWorker.WorkerContext workerContext, int port) {
+ public TajoWorkerClientService(TajoWorker.WorkerContext workerContext) {
super(TajoWorkerClientService.class.getName());
- this.port = port;
this.workerContext = workerContext;
}
@Override
- public void init(Configuration conf) {
- Preconditions.checkArgument(conf instanceof TajoConf);
- this.conf = (TajoConf) conf;
+ public void serviceInit(Configuration conf) throws Exception {
+ TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+
this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
// init RPC Server in constructor cause Heartbeat Thread use bindAddr
try {
- InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
+ InetSocketAddress initIsa = tajoConf.getSocketAddrVar(TajoConf.ConfVars.WORKER_CLIENT_RPC_ADDRESS);
if (initIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
- int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+ int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + bindAddr);
-
- super.init(conf);
- }
-
- @Override
- public void start() {
- super.start();
+ tajoConf.setVar(TajoConf.ConfVars.WORKER_CLIENT_RPC_ADDRESS, NetUtils.getHostPortString(bindAddr));
+ super.serviceInit(tajoConf);
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
LOG.info("TajoWorkerClientService stopping");
if(rpcServer != null) {
rpcServer.shutdown();
}
LOG.info("TajoWorkerClientService stopped");
- super.stop();
+ super.serviceStop();
}
public InetSocketAddress getBindAddr() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 4595e5d..f622ad3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -48,14 +48,12 @@ public class TajoWorkerManagerService extends CompositeService
private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private int port;
private TajoWorker.WorkerContext workerContext;
- public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
+ public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext) {
super(TajoWorkerManagerService.class.getName());
this.workerContext = workerContext;
- this.port = port;
}
@Override
@@ -64,8 +62,8 @@ public class TajoWorkerManagerService extends CompositeService
TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
try {
// Setup RPC server
- InetSocketAddress initIsa =
- new InetSocketAddress("0.0.0.0", port);
+ InetSocketAddress initIsa = tajoConf.getSocketAddrVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS);
+
if (initIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
@@ -75,13 +73,12 @@ public class TajoWorkerManagerService extends CompositeService
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
LOG.info("TajoWorkerManagerService is bind to " + bindAddr);
- tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
+ tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.getHostPortString(bindAddr));
super.serviceInit(tajoConf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java
index 1b58c32..b8b02ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/WorkerRuleProvider.java
@@ -18,19 +18,19 @@
package org.apache.tajo.worker.rule;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.rule.SelfDiagnosisRule;
+import org.apache.tajo.rule.SelfDiagnosisRuleProvider;
import org.apache.tajo.util.ClassUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
public class WorkerRuleProvider implements SelfDiagnosisRuleProvider {
- private Log LOG = LogFactory.getLog(getClass());
+ private static final Log LOG = LogFactory.getLog(WorkerRuleProvider.class);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
index ba106d2..bef7a1c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
@@ -37,7 +37,7 @@ import java.net.URI;
public class TajoRestService extends CompositeService {
- private final Log LOG = LogFactory.getLog(getClass());
+ private static final Log LOG = LogFactory.getLog(TajoRestService.class);
private MasterContext masterContext;
private NettyRestServer restServer;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-dist/src/main/conf/tajo-site.xml.template
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-site.xml.template b/tajo-dist/src/main/conf/tajo-site.xml.template
index 026e6ff..ad15fec 100644
--- a/tajo-dist/src/main/conf/tajo-site.xml.template
+++ b/tajo-dist/src/main/conf/tajo-site.xml.template
@@ -26,7 +26,7 @@
<property>
<name>tajo.rootdir</name>
- <value>hdfs://namenode_hostname:port/path</value>
+ <value>hdfs://nameservice/path</value>
<description>Base directory including system directories.</description>
</property>
@@ -39,7 +39,7 @@
<property>
<name>tajo.master.client-rpc.address</name>
<value>hostname:26002</value>
- <description>TajoMaster binding address between master and clients.</description>
+ <description>TajoMaster binding address between master and remote clients.</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
index 3835512..1bd1881 100644
--- a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
+++ b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
@@ -19,28 +19,21 @@ Please add the following configs to tajo-site.xml file:
<property>
<name>tajo.rootdir</name>
- <value>hdfs://hostname:port/tajo</value>
+ <value>hdfs://nameservice/tajo</value>
</property>
<property>
<name>tajo.master.umbilical-rpc.address</name>
<value>hostname:26001</value>
+ <description>TajoMaster binding address between master and workers.</description>
</property>
<property>
<name>tajo.master.client-rpc.address</name>
<value>hostname:26002</value>
+ <description>TajoMaster binding address between master and remote clients.</description>
</property>
- <property>
- <name>tajo.resource-tracker.rpc.address</name>
- <value>hostname:26003</value>
- </property>
-
- <property>
- <name>tajo.catalog.client-rpc.address</name>
- <value>hostname:26005</value>
- </property>
Workers
-------
@@ -82,5 +75,5 @@ Then, execute ``start-tajo.sh`` ::
.. note::
- By default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
+ By default, TajoMaster listens on localhost/127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-docs/src/main/sphinx/configuration/service_config_defaults.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/service_config_defaults.rst b/tajo-docs/src/main/sphinx/configuration/service_config_defaults.rst
index aaaf34e..31ecb57 100644
--- a/tajo-docs/src/main/sphinx/configuration/service_config_defaults.rst
+++ b/tajo-docs/src/main/sphinx/configuration/service_config_defaults.rst
@@ -5,24 +5,24 @@ Cluster Service Configuration Defaults
Tajo Master Configuration Defaults
====================================
-============================ ============================================================== =========== ===============
- Service Name Config Property Name Description default address
-============================ ============================================================== =========== ===============
-Tajo Master Umbilical Rpc tajo.master.umbilical-rpc.address localhost:26001
-Tajo Master Client Rpc tajo.master.client-rpc.address localhost:26002
-Tajo Master Info Http tajo.master.info-http.address 0.0.0.0:26080
-Tajo Resource Tracker Rpc tajo.resource-tracker.rpc.address localhost:26003
-Tajo Catalog Client Rpc tajo.catalog.client-rpc.address localhost:26005
-============================ ============================================================== =========== ===============
+============================ ==================================== =============== =================================================================
+ Service Name Config Property Name default address Description
+============================ ==================================== =============== =================================================================
+Tajo Master Umbilical Rpc tajo.master.umbilical-rpc.address localhost:26001 TajoMaster binding address between master and workers.
+Tajo Master Client Rpc tajo.master.client-rpc.address localhost:26002 TajoMaster binding address between master and remote clients.
+Tajo Master Info Http tajo.master.info-http.address 0.0.0.0:26080 TajoMaster binding address between master and web browser.
+Tajo Resource Tracker Rpc tajo.resource-tracker.rpc.address 0.0.0.0:26003 TajoMaster binding address between master and workers.
+Tajo Catalog Client Rpc tajo.catalog.client-rpc.address 0.0.0.0:26005 CatalogServer binding address between catalog server and workers.
+============================ ==================================== =============== =================================================================
====================================
Tajo Worker Configuration Defaults
====================================
-============================ ============================================================== =========== ===============
- Service Name Config Property Name Description default address
-============================ ============================================================== =========== ===============
-Tajo Worker Peer Rpc tajo.worker.peer-rpc.address 0.0.0.0:28091
-Tajo Worker Client Rpc tajo.worker.client-rpc.address 0.0.0.0:28092
-Tajo Worker Info Http tajo.worker.info-http.address 0.0.0.0:28080
-============================ ============================================================== =========== ===============
\ No newline at end of file
+============================ ==================================== =============== =================================================================
+ Service Name Config Property Name default address Description
+============================ ==================================== =============== =================================================================
+Tajo Worker Peer Rpc tajo.worker.peer-rpc.address 0.0.0.0:28091 TajoWorker binding address between worker and master.
+Tajo Worker Client Rpc tajo.worker.client-rpc.address 0.0.0.0:28092 TajoWorker binding address between worker and remote clients.
+Tajo Worker Info Http tajo.worker.info-http.address 0.0.0.0:28080 TajoWorker binding address between master and web browser.
+============================ ==================================== =============== =================================================================
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index ce083f1..fa0fb4f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -51,8 +51,7 @@ import java.util.*;
*/
public class ProjectionPushDownRule extends
BasicLogicalPlanVisitor<ProjectionPushDownRule.Context, LogicalNode> implements LogicalPlanRewriteRule {
- /** Class Logger */
- private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class);
+
private static final String name = "ProjectionPushDown";
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
index 59e8904..5d5c0a0 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -42,7 +42,7 @@ import java.util.List;
import java.util.Map;
public class AdvancedDataRetriever implements DataRetriever {
- private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+ private static final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
public AdvancedDataRetriever() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 3d12a40..ad45749 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -61,7 +61,7 @@ public class FileTablespace extends Tablespace {
return !name.startsWith("_") && !name.startsWith(".");
}
};
- private final Log LOG = LogFactory.getLog(FileTablespace.class);
+ private static final Log LOG = LogFactory.getLog(FileTablespace.class);
static final String OUTPUT_FILE_PREFIX="part-";
static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE =
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
index 4cfd19d..58fea2b 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
@@ -41,7 +41,7 @@ import java.util.*;
public class OrcRecordReader implements Closeable {
- private final Log LOG = LogFactory.getLog(OrcRecordReader.class);
+ private static final Log LOG = LogFactory.getLog(OrcRecordReader.class);
private final Path path;
private final long firstRow;
http://git-wip-us.apache.org/repos/asf/tajo/blob/16cbf69f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
index a58cb83..30197c9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -36,7 +36,7 @@ import java.net.URLDecoder;
public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
- private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
+ private static final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {