You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/09/23 04:38:56 UTC
[2/2] tajo git commit: TAJO-1860: Refactor Rpc clients to take
Connection Parameters.
TAJO-1860: Refactor Rpc clients to take Connection Parameters.
Closes #763
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1eb10045
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1eb10045
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1eb10045
Branch: refs/heads/master
Commit: 1eb10045908a8c5c6304e10379fa08a68bd61dad
Parents: 5a15586
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Sep 22 19:20:18 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Sep 22 19:33:32 2015 -0700
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/catalog/CatalogClient.java | 14 +-
.../tajo/cli/tsql/CliClientParamsFactory.java | 50 ++++++
.../java/org/apache/tajo/cli/tsql/TajoCli.java | 22 ++-
.../tajo/client/ClientParameterHelper.java | 161 +++++++++++++++++++
.../apache/tajo/client/ClientParameters.java | 32 ++++
.../org/apache/tajo/client/QueryClientImpl.java | 14 +-
.../apache/tajo/client/SessionConnection.java | 18 +--
.../tajo/client/v2/ClientDelegateFactory.java | 10 +-
.../tajo/client/v2/LegacyClientDelegate.java | 13 +-
.../org/apache/tajo/client/v2/TajoClient.java | 27 ++--
.../java/org/apache/tajo/conf/TajoConf.java | 4 +
.../apache/tajo/exception/SQLExceptionUtil.java | 4 +-
tajo-common/src/main/proto/errors.proto | 4 +-
.../cli/tsql/TestDefaultCliOutputFormatter.java | 2 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 19 ++-
.../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +-
.../commands/TestExecExternalShellCommand.java | 2 +-
.../tajo/cli/tsql/commands/TestHdfsCommand.java | 2 +-
.../apache/tajo/util/TestRpcParamFactory.java | 58 +++++++
.../org/apache/tajo/master/QueryInProgress.java | 12 +-
.../java/org/apache/tajo/master/TajoMaster.java | 7 -
.../tajo/querymaster/DefaultTaskScheduler.java | 21 ++-
.../apache/tajo/querymaster/QueryMaster.java | 20 +--
.../tajo/querymaster/QueryMasterTask.java | 11 +-
.../java/org/apache/tajo/querymaster/Stage.java | 7 +-
.../apache/tajo/util/RpcParameterFactory.java | 51 ++++++
.../tajo/worker/ExecutionBlockContext.java | 6 +-
.../apache/tajo/worker/NodeStatusUpdater.java | 21 +--
.../java/org/apache/tajo/worker/TajoWorker.java | 5 -
.../org/apache/tajo/worker/TaskManager.java | 12 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 4 +-
tajo-docs/src/main/sphinx/jdbc_driver.rst | 43 +++++
.../sphinx/table_management/tablespaces.rst | 2 +-
.../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 15 +-
.../apache/tajo/jdbc/TestTajoJdbcNegative.java | 54 ++++++-
.../java/org/apache/tajo/rpc/RpcConstants.java | 32 +++-
.../org/apache/tajo/rpc/AsyncRpcClient.java | 43 +++--
.../org/apache/tajo/rpc/BlockingRpcClient.java | 45 +++---
.../org/apache/tajo/rpc/NettyClientBase.java | 68 +++++---
.../tajo/rpc/ProtoClientChannelInitializer.java | 25 +--
.../org/apache/tajo/rpc/RpcClientManager.java | 131 ++++-----------
.../org/apache/tajo/rpc/RpcConnectionKey.java | 56 +++++++
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 119 +++++++++-----
.../org/apache/tajo/rpc/TestBlockingRpc.java | 115 +++++++++----
.../apache/tajo/rpc/TestRpcClientManager.java | 22 +--
46 files changed, 1019 insertions(+), 388 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 63af5dd..cfad5fe 100644
--- a/CHANGES
+++ b/CHANGES
@@ -40,6 +40,8 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1860: Refactor Rpc clients to take Connection Parameters. (hyunsik)
+
TAJO-1868: Allow TablespaceManager::get to return a unregistered
tablespace. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 80ded4a..2b24a6b 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -22,16 +22,14 @@ import com.google.protobuf.ServiceException;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.rpc.*;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
/**
* CatalogClient provides a client API to access the catalog server.
@@ -71,10 +69,12 @@ public class CatalogClient extends AbstractCatalogClient {
if (client != null && client.isConnected()) return client;
RpcClientManager.cleanup(client);
- int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES);
+ final Properties clientParams = new Properties();
+ clientParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, conf.getVar(ConfVars.RPC_CLIENT_RETRY_NUM));
+
// Client do not closed on idle state for support high available
- this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false,
- retry, 0, TimeUnit.SECONDS, false);
+ this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class,
+ false, clientParams);
} catch (Exception e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java
new file mode 100644
index 0000000..5fabae8
--- /dev/null
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli.tsql;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.client.ClientParameters;
+
+import javax.validation.constraints.Null;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+class CliClientParamsFactory {
+ static Map<String, String> DEFAULT_PARAMS = new HashMap<>();
+
+ static {
+ // Keep lexicographic order of parameter names.
+ DEFAULT_PARAMS.put(ClientParameters.CONNECT_TIMEOUT, "0");
+ DEFAULT_PARAMS.put(ClientParameters.SOCKET_TIMEOUT, "0");
+ DEFAULT_PARAMS.put(ClientParameters.RETRY, "3");
+ DEFAULT_PARAMS.put(ClientParameters.ROW_FETCH_SIZE, "200");
+ DEFAULT_PARAMS.put(ClientParameters.USE_COMPRESSION, "false");
+ }
+
+ public static Properties get(@Nullable Properties connParam) {
+ Properties copy = connParam == null ? new Properties() : (Properties) connParam.clone();
+ for (Map.Entry<String, String> entry : DEFAULT_PARAMS.entrySet()) {
+ if (!copy.contains(entry.getKey())) {
+ copy.setProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 83763e8..8ae7075 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -18,6 +18,7 @@
package org.apache.tajo.cli.tsql;
+import com.google.common.collect.Maps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
@@ -26,6 +27,7 @@ import jline.console.ConsoleReader;
import org.apache.commons.cli.*;
import org.apache.tajo.*;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.cli.tsql.ParsedResult.StatementType;
import org.apache.tajo.cli.tsql.SimpleParser.ParsingState;
@@ -39,16 +41,14 @@ import org.apache.tajo.exception.TajoException;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ShutdownHookManager;
import java.io.*;
import java.lang.reflect.Constructor;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
public class TajoCli {
public static final int SHUTDOWN_HOOK_PRIORITY = 50;
@@ -176,7 +176,9 @@ public class TajoCli {
}
}
- public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) throws Exception {
+ public TajoCli(TajoConf c, String [] args, @Nullable Properties clientParams, InputStream in, OutputStream out)
+ throws Exception {
+
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
@@ -234,14 +236,18 @@ public class TajoCli {
}
}
+ // Get connection parameters
+ Properties defaultConnParams = CliClientParamsFactory.get(clientParams);
+ final KeyValueSet actualConnParams = new KeyValueSet(Maps.fromProperties(defaultConnParams));
+
if ((hostName == null) ^ (port == null)) {
System.err.println(ERROR_PREFIX + "cannot find valid Tajo server address");
throw new RuntimeException("cannot find valid Tajo server address");
} else if (hostName != null && port != null) {
conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
- client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase);
+ client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams);
} else if (hostName == null && port == null) {
- client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase);
+ client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams);
}
try {
@@ -694,7 +700,7 @@ public class TajoCli {
public static void main(String [] args) throws Exception {
TajoConf conf = new TajoConf();
- TajoCli shell = new TajoCli(conf, args, System.in, System.out);
+ TajoCli shell = new TajoCli(conf, args, new Properties(), System.in, System.out);
System.out.println();
System.exit(shell.runShell());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java
new file mode 100644
index 0000000..8549178
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.util.Pair;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tajo.SessionVars.COMPRESSED_RESULT_TRANSFER;
+import static org.apache.tajo.SessionVars.FETCH_ROWNUM;
+import static org.apache.tajo.client.ClientParameterHelper.ActionType.CONNECTION_PARAM;
+import static org.apache.tajo.client.ClientParameterHelper.ActionType.SESSION_UPDATE;
+import static org.apache.tajo.rpc.RpcConstants.CLIENT_CONNECTION_TIMEOUT;
+import static org.apache.tajo.rpc.RpcConstants.CLIENT_SOCKET_TIMEOUT;
+
+/**
+ * <ul>
+ * <li><code>useCompression=bool</code> - Enable compressed transfer for ResultSet. </li>
+ * <li><code>defaultRowFetchSize=int</code> - Determine the number of rows fetched in ResultSet by
+ * one fetch with trip to the Server.</li>
+ * <li><code>connectTimeout=int</code> - The timeout value used for socket connect operations.
+ * If connecting to the server takes longer than this value,the connection is broken. The
+ * timeout is specified in seconds and a value of zero means that it is disabled.</li>
+ * <li><code>socketTimeout=int</code></li> - The timeout value used for socket read operations.
+ * If reading from the server takes longer than this value, the connection is closed.
+ * This can be used as both a brute force global query timeout and a method of detecting
+ * network problems. The timeout is specified in seconds and a value of zero means that
+ * it is disabled.</li>
+ * <li><code>retry=int</code>Number of retry operation. Tajo JDBC driver is resilient
+ * against some network or connection problems. It determines how many times the connection will retry.</li>
+ * </ul>
+ */
+class ClientParameterHelper {
+
+ public static Map<String, Action> PARAMETERS = new HashMap<>();
+
+ static {
+ PARAMETERS.put(ClientParameters.USE_COMPRESSION, new SimpleSessionAction(COMPRESSED_RESULT_TRANSFER));
+ PARAMETERS.put(ClientParameters.ROW_FETCH_SIZE, new SimpleSessionAction(FETCH_ROWNUM));
+ PARAMETERS.put(ClientParameters.CONNECT_TIMEOUT, new ConnectionParamAction() {
+ @Override
+ Pair<String, String> doAction(String param) {
+ int seconds = Integer.parseInt(param);
+ // convert seconds into mili seconds
+ return new Pair<>(CLIENT_CONNECTION_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(seconds)));
+ }
+ });
+ PARAMETERS.put(ClientParameters.SOCKET_TIMEOUT, new ConnectionParamAction() {
+ @Override
+ Pair<String, String> doAction(String param) {
+ int seconds = Integer.parseInt(param);
+ return new Pair<>(CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(seconds)));
+ }
+ });
+ PARAMETERS.put(ClientParameters.RETRY, new SimpleConnectionParamAction(RpcConstants.CLIENT_RETRY_NUM));
+ }
+
+ enum ActionType {
+ SESSION_UPDATE,
+ CONNECTION_PARAM
+ }
+
+ interface Action {
+ ActionType type();
+ }
+
+ static class SimpleSessionAction extends SessionAction {
+ private final String sessionKey;
+
+ SimpleSessionAction(SessionVars sessionVar) {
+ this.sessionKey = sessionVar.name();
+ }
+
+ Pair<String, String> doAction(String param) {
+ return new Pair<>(sessionKey, param);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ static abstract class SessionAction implements Action {
+
+ @Override
+ public ActionType type() {
+ return SESSION_UPDATE;
+ }
+
+ abstract Pair<String, String> doAction(String param);
+ }
+
+ static class SimpleConnectionParamAction extends ConnectionParamAction {
+ final String connParamKey;
+
+ SimpleConnectionParamAction(String connParamKey) {
+ this.connParamKey = connParamKey;
+ }
+
+ public Pair<String, String> doAction(String param) {
+ return new Pair<>(connParamKey, param);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ static abstract class ConnectionParamAction implements Action {
+
+ @Override
+ public ActionType type() {
+ return ActionType.CONNECTION_PARAM;
+ }
+
+ abstract Pair<String, String> doAction(String param);
+ }
+
+ public static Properties getConnParams(Collection<Map.Entry<String, String>> properties) {
+ Properties connParams = new Properties();
+ for (Map.Entry<String, String> entry : properties) {
+ if(PARAMETERS.containsKey(entry.getKey()) && PARAMETERS.get(entry.getKey()).type() == CONNECTION_PARAM) {
+ Pair<String, String> keyValue =
+ ((ConnectionParamAction)PARAMETERS.get(entry.getKey())).doAction(entry.getValue());
+ connParams.put(keyValue.getFirst(), keyValue.getSecond());
+ }
+ }
+
+ return connParams;
+ }
+
+ public static Map<String, String> getSessionVars(Collection<Map.Entry<String, String>> properties) {
+ Map<String, String> sessionVars = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : properties) {
+ if(PARAMETERS.containsKey(entry.getKey()) && PARAMETERS.get(entry.getKey()).type() == SESSION_UPDATE) {
+ Pair<String, String> keyValue =
+ ((SessionAction)PARAMETERS.get(entry.getKey())).doAction(entry.getValue());
+ sessionVars.put(keyValue.getFirst(), keyValue.getSecond());
+ }
+ }
+
+ return sessionVars;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java
new file mode 100644
index 0000000..e3ee019
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+/**
+ * Client Parameters which can enable or disable some features of TajoClient.
+ * This class contains the parameter keys. In more detail,
+ * please refer to http://tajo.apache.org/docs/current/jdbc_driver.html##connection-parameters
+ */
+public interface ClientParameters {
+ String USE_COMPRESSION = "useCompression";
+ String ROW_FETCH_SIZE = "defaultRowFetchSize";
+ String CONNECT_TIMEOUT = "connectTimeout";
+ String SOCKET_TIMEOUT = "socketTimeout";
+ String RETRY = "retry";
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 144e3b6..364292e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -50,6 +50,7 @@ import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -352,7 +353,7 @@ public class QueryClientImpl implements QueryClient {
protected TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum)
throws TajoException {
- boolean compress = conn.getProperties().getBool(SessionVars.COMPRESSED_RESULT_TRANSFER);
+ final boolean compress = conn.getProperties().getBool(ClientParameters.USE_COMPRESSION);
final BlockingInterface stub = conn.getTMStub();
final GetQueryResultDataRequest.Builder request = GetQueryResultDataRequest.newBuilder();
@@ -544,16 +545,7 @@ public class QueryClientImpl implements QueryClient {
try {
- qmClient = manager.newClient(
- qmAddress,
- QueryMasterClientProtocol.class,
- false,
- manager.getRetries(),
- manager.getTimeoutSeconds(),
- TimeUnit.SECONDS,
- false
- );
-
+ qmClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false, new Properties());
conn.checkSessionAndGet(conn.getTajoMasterConnection());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 3307ade..f63bb47 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -40,7 +40,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
@@ -51,10 +50,7 @@ import org.apache.tajo.util.ProtoUtil;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
@@ -85,6 +81,8 @@ public class SessionConnection implements Closeable {
private NettyClientBase client;
+ private Properties clientParams;
+
private final KeyValueSet properties;
/**
@@ -103,8 +101,9 @@ public class SessionConnection implements Closeable {
this.properties = properties;
this.manager = RpcClientManager.getInstance();
- this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
this.userInfo = UserRoleInfo.getCurrentUser();
+ // update the connection parameters to RPC client from connection properties
+ this.clientParams = ClientParameterHelper.getConnParams(properties.getAllKeyValus().entrySet());
this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4);
try {
@@ -113,6 +112,9 @@ public class SessionConnection implements Closeable {
NettyUtils.shutdown(eventLoopGroup);
throw e;
}
+
+ // update the session variables from connection parameters
+ updateSessionVariables(ClientParameterHelper.getSessionVars(properties.getAllKeyValus().entrySet()));
}
public Map<String, String> getClientSideSessionVars() {
@@ -130,7 +132,7 @@ public class SessionConnection implements Closeable {
// Client do not closed on idle state for support high available
this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class,
- manager.getRetries(), eventLoopGroup);
+ eventLoopGroup, clientParams);
} catch (Throwable t) {
throw new TajoRuntimeException(new ClientConnectionException(t));
}
@@ -217,7 +219,6 @@ public class SessionConnection implements Closeable {
ensureOk(response.getState());
updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- properties.putAll(sessionVarsCache);
return Collections.unmodifiableMap(sessionVarsCache);
}
@@ -362,7 +363,6 @@ public class SessionConnection implements Closeable {
}
CreateSessionResponse response = null;
-
try {
response = tajoMasterService.createSession(null, builder.build());
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
index 44721b3..feee37b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
@@ -21,22 +21,22 @@ package org.apache.tajo.client.v2;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
-import java.util.Map;
+import java.util.Properties;
public class ClientDelegateFactory {
public static ClientDelegate newDefaultDelegate(String host,
int port,
- @Nullable Map<String, String> props)
+ @Nullable Properties clientParams)
throws ClientUnableToConnectException {
- return new LegacyClientDelegate(host, port, props);
+ return new LegacyClientDelegate(host, port, clientParams);
}
public static ClientDelegate newDefaultDelegate(ServiceDiscovery discovery,
- @Nullable Map<String, String> props)
+ @Nullable Properties clientParams)
throws ClientUnableToConnectException {
- return new LegacyClientDelegate(discovery, props);
+ return new LegacyClientDelegate(discovery, clientParams);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
index 0a2c6de..889d974 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
@@ -18,6 +18,7 @@
package org.apache.tajo.client.v2;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
@@ -40,8 +41,10 @@ import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,13 +58,15 @@ public class LegacyClientDelegate extends SessionConnection implements ClientDel
private QueryClientImpl queryClient;
private final ExecutorService executor = Executors.newFixedThreadPool(8);
- public LegacyClientDelegate(String host, int port, Map<String, String> props) {
- super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, new KeyValueSet(props));
+ public LegacyClientDelegate(String host, int port, Properties clientParams) {
+ super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null,
+ new KeyValueSet(clientParams == null ? new HashMap<String, String>() : Maps.fromProperties(clientParams)));
queryClient = new QueryClientImpl(this);
}
- public LegacyClientDelegate(ServiceDiscovery discovery, Map<String, String> props) {
- super(new DelegateServiceTracker(discovery), null, new KeyValueSet(props));
+ public LegacyClientDelegate(ServiceDiscovery discovery, Properties clientParams) {
+ super(new DelegateServiceTracker(discovery), null,
+ new KeyValueSet(clientParams == null ? new HashMap<String, String>() : Maps.fromProperties(clientParams)));
queryClient = new QueryClientImpl(this);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
index 2b4a150..dc81742 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
@@ -29,6 +29,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.Map;
+import java.util.Properties;
public class TajoClient implements Closeable {
private static Log LOG = LogFactory.getLog(TajoClient.class);
@@ -52,11 +53,11 @@ public class TajoClient implements Closeable {
/**
* Initialize TajoClient with a hostname and default port 26002.
*
- * @param host Hostname to connect
- * @param properties Connection properties
+ * @param host Hostname to connect
+ * @param clientParams Client connection parameters
*/
- public TajoClient(String host, Map<String, String> properties) throws ClientUnableToConnectException {
- delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, properties);
+ public TajoClient(String host, Properties clientParams) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, clientParams);
}
/**
@@ -72,12 +73,12 @@ public class TajoClient implements Closeable {
/**
* Initialize TajoClient with a hostname and port
*
- * @param host Hostname to connect
- * @param port Port number to connect
- * @param properties Connection properties
+ * @param host Hostname to connect
+ * @param port Port number to connect
+ * @param clientParams Client connection parameters
*/
- public TajoClient(String host, int port, Map<String, String> properties) throws ClientUnableToConnectException {
- delegate = ClientDelegateFactory.newDefaultDelegate(host, port, properties);
+ public TajoClient(String host, int port, Properties clientParams) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, port, clientParams);
}
/**
@@ -92,11 +93,11 @@ public class TajoClient implements Closeable {
/**
* Initialize TajoClient via service discovery protocol
*
- * @param discovery Service discovery
- * @param properties Connection properties
+ * @param discovery Service discovery
+ * @param clientParams Client connection parameters
*/
- public TajoClient(ServiceDiscovery discovery, Map<String, String> properties) throws ClientUnableToConnectException {
- delegate = ClientDelegateFactory.newDefaultDelegate(discovery, properties);
+ public TajoClient(ServiceDiscovery discovery, Properties clientParams) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(discovery, clientParams);
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 3210323..d7789f8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -230,6 +230,10 @@ public class TajoConf extends Configuration {
// Internal RPC Client
INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
+ RPC_CLIENT_RETRY_NUM("tajo.rpc.client.retry-num", 3, Validators.min("1")),
+ RPC_CLIENT_CONNECTION_TIMEOUT("tajo.rpc.client.connection-timeout-ms", (long)15 * 1000, Validators.min("0")),
+ RPC_CLIENT_SOCKET_TIMEOUT("tajo.rpc.client.socket-timeout-ms", (long)180 * 1000, Validators.min("0")),
+ RPC_CLIENT_HANG_DETECTION_ENABLED("tajo.rpc.client.hang-detection", true, Validators.bool()),
// Internal RPC Server
MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.rpc.server.worker-thread-num",
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
index 4b4ec97..bc40072 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
@@ -104,9 +104,9 @@ public class SQLExceptionUtil {
SQLSTATES.put(ResultCode.INDETERMINATE_DATATYPE, "42P18");
// Client Connection
- SQLSTATES.put(ResultCode.CLIENT_CONNECTION_EXCEPTION, "08001");
- SQLSTATES.put(ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "08002");
+ SQLSTATES.put(ResultCode.CLIENT_CONNECTION_EXCEPTION, "08000");
SQLSTATES.put(ResultCode.CLIENT_CONNECTION_DOES_NOT_EXIST, "08003");
+ SQLSTATES.put(ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "08006");
}
public static boolean isThisError(SQLException e, ResultCode code) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/proto/errors.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto
index bb973f2..573fc7e 100644
--- a/tajo-common/src/main/proto/errors.proto
+++ b/tajo-common/src/main/proto/errors.proto
@@ -172,9 +172,9 @@ enum ResultCode {
// Client Connection
CLIENT_CONNECTION_EXCEPTION = 1101; // SQLState: 08000 - Client connection error
- CLIENT_UNABLE_TO_ESTABLISH_CONNECTION = 1102; // SQLState: 08001 -
CLIENT_CONNECTION_DOES_NOT_EXIST = 1103; // SQLState: 08003 - Client connection has been closed.
- CLIENT_PROTOCOL_PROTOCOL_VIOLATION = 1104; // SQLState: ?
+ CLIENT_UNABLE_TO_ESTABLISH_CONNECTION = 1102; // SQLState: 08006 - Client connection failure
+ CLIENT_PROTOCOL_PROTOCOL_VIOLATION = 1104; // SQLState: 08P01 - Protocol violation
// 53 - Invalid Operand or Inconsistent Specification
INSUFFICIENT_RESOURCE = 53000;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
index 3b53c60..cdcdb67 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
@@ -66,7 +66,7 @@ public class TestDefaultCliOutputFormatter {
public void setUp() throws Exception {
conf = cluster.getConfiguration();
ByteArrayOutputStream out = new ByteArrayOutputStream();
- tajoCli = new TajoCli(conf, new String[]{}, System.in, out);
+ tajoCli = new TajoCli(conf, new String[]{}, null, System.in, out);
cliContext = tajoCli.getContext();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index 8ddef09..da51aed 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -29,9 +29,11 @@ import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TpchTestBase;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.ClientParameters;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.FileUtil;
@@ -43,6 +45,7 @@ import org.junit.rules.TestName;
import java.io.*;
import java.net.URL;
+import java.util.Properties;
import static org.junit.Assert.*;
@@ -74,7 +77,9 @@ public class TestTajoCli {
@Before
public void setUp() throws Exception {
out = new ByteArrayOutputStream();
- tajoCli = new TajoCli(cluster.getConfiguration(), new String[]{}, System.in, out);
+ Properties connParams = new Properties();
+ connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, "3");
+ tajoCli = new TajoCli(cluster.getConfiguration(), new String[]{}, connParams, System.in, out);
}
@After
@@ -155,7 +160,7 @@ public class TestTajoCli {
assertEquals("tajo.executor.join.inner.in-memory-table-num=256", confValues[1]);
TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
- TajoCli testCli = new TajoCli(tajoConf, args, System.in, System.out);
+ TajoCli testCli = new TajoCli(tajoConf, args, null, System.in, System.out);
try {
assertEquals("false", testCli.getContext().get(SessionVars.CLI_PAGING_ENABLED));
assertEquals("256", testCli.getContext().getConf().get("tajo.executor.join.inner.in-memory-table-num"));
@@ -321,7 +326,7 @@ public class TestTajoCli {
setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
+ TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, null, System.in, out);
try {
tajoCli.executeMetaCommand("\\getconf tajo.rootdir");
@@ -338,7 +343,7 @@ public class TestTajoCli {
setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
+ TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, null, System.in, out);
tajoCli.executeMetaCommand("\\admin -showmasters");
String consoleResult = new String(out.toByteArray());
@@ -372,7 +377,9 @@ public class TestTajoCli {
TajoConf tajoConf = new TajoConf();
setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
- TajoCli tc = new TajoCli(tajoConf, new String[]{}, is, out);
+ Properties connParams = new Properties();
+ connParams.setProperty(ClientParameters.RETRY, "3");
+ TajoCli tc = new TajoCli(tajoConf, new String[]{}, connParams, is, out);
tc.executeMetaCommand("\\set ON_ERROR_STOP false");
assertSessionVar(tc, SessionVars.ON_ERROR_STOP.keyname(), "false");
@@ -452,7 +459,7 @@ public class TestTajoCli {
assertEquals(0L, tableDesc.getStats().getNumRows().longValue());
InputStream testInput = new ByteArrayInputStream(new byte[]{(byte) DefaultTajoCliOutputFormatter.QUIT_COMMAND});
- cli = new TajoCli(cluster.getConfiguration(), new String[]{}, testInput, out);
+ cli = new TajoCli(cluster.getConfiguration(), new String[]{}, null, testInput, out);
setVar(cli, SessionVars.CLI_PAGE_ROWS, "2");
setVar(cli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
index 3ebbafa..d1f1023 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
@@ -40,7 +40,7 @@ public class TestTajoCliNegatives extends QueryTestCaseBase {
@BeforeClass
public static void setUp() throws Exception {
out = new ByteArrayOutputStream();
- tajoCli = new TajoCli(testingCluster.getConfiguration(), new String[]{}, System.in, out);
+ tajoCli = new TajoCli(testingCluster.getConfiguration(), new String[]{}, null, System.in, out);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java
index ed7ee4a..95c3a8b 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java
@@ -34,7 +34,7 @@ public class TestExecExternalShellCommand {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+ TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, null, out);
cli.executeMetaCommand("\\! echo \"this is test\"");
String consoleResult = new String(out.toByteArray());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java
index 496c7e3..d239c0a 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java
@@ -37,7 +37,7 @@ public class TestHdfsCommand {
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(out));
- TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+ TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, null, out);
cli.executeMetaCommand("\\dfs -test");
String consoleResult = new String(out.toByteArray());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java
new file mode 100644
index 0000000..1d63bba
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.rpc.RpcConstants;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.tajo.rpc.RpcConstants.CLIENT_CONNECTION_TIMEOUT;
+import static org.apache.tajo.rpc.RpcConstants.CLIENT_RETRY_NUM;
+import static org.junit.Assert.*;
+
+public class TestRpcParamFactory {
+
+ @Test
+ public void testGetDefaults() throws Exception {
+ TajoConf conf = new TajoConf();
+ Properties defaultParams = RpcParameterFactory.get(conf);
+ assertEquals(
+ ConfVars.RPC_CLIENT_RETRY_NUM.defaultVal, defaultParams.getProperty(CLIENT_RETRY_NUM));
+ assertEquals(
+ ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT.defaultVal, defaultParams.getProperty(CLIENT_CONNECTION_TIMEOUT));
+ assertEquals(
+ ConfVars.RPC_CLIENT_SOCKET_TIMEOUT.defaultVal, defaultParams.getProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT));
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ TajoConf conf = new TajoConf();
+ conf.setIntVar(ConfVars.RPC_CLIENT_RETRY_NUM, 100);
+ conf.setLongVar(ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT, (long)(10 * 1000));
+ conf.setLongVar(ConfVars.RPC_CLIENT_SOCKET_TIMEOUT, (long)60 * 1000);
+
+ Properties defaultParams = RpcParameterFactory.get(conf);
+ assertEquals("100", defaultParams.getProperty(CLIENT_RETRY_NUM));
+ assertEquals(10 * 1000 + "", defaultParams.getProperty(CLIENT_CONNECTION_TIMEOUT));
+ assertEquals(60 * 1000 + "", defaultParams.getProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index b848876..8e999c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -39,9 +39,11 @@ import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.RpcParameterFactory;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -70,6 +72,8 @@ public class QueryInProgress {
private AllocationResourceProto allocationResource;
+ private final Properties rpcParams;
+
private final Lock readLock;
private final Lock writeLock;
@@ -87,6 +91,8 @@ public class QueryInProgress {
queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
queryInfo.setStartTime(System.currentTimeMillis());
+ rpcParams = RpcParameterFactory.get(masterContext.getConf());
+
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
@@ -99,7 +105,7 @@ public class QueryInProgress {
if (queryMasterRpcClient != null) {
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
}
} catch (Throwable e) {
catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -182,7 +188,7 @@ public class QueryInProgress {
InetSocketAddress addr = NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getQueryMasterPort());
LOG.info("Try to connect to QueryMaster:" + addr);
- queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true);
+ queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true, rpcParams);
queryMasterRpcClient = queryMasterRpc.getStub();
}
@@ -216,7 +222,7 @@ public class QueryInProgress {
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
querySubmitted = true;
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 2f37cee..922a5eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -54,7 +54,6 @@ import org.apache.tajo.metrics.ClusterResourceMetricSet;
import org.apache.tajo.metrics.Master;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
@@ -172,14 +171,8 @@ public class TajoMaster extends CompositeService {
context = new MasterContext(systemConf);
clock = new SystemClock();
-
RackResolver.init(systemConf);
- RpcClientManager rpcManager = RpcClientManager.getInstance();
- rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
- rpcManager.setTimeoutSeconds(
- systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
-
initResourceManager();
this.dispatcher = new AsyncDispatcher();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index b65b5a9..25695a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -48,6 +48,7 @@ import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
@@ -69,6 +70,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private final TaskSchedulerContext context;
private Stage stage;
private TajoConf tajoConf;
+ private Properties rpcParams;
private Thread schedulingThread;
private volatile boolean isStopped;
@@ -83,7 +85,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private int schedulerDelay;
private int maximumRequestContainer;
- //candidate workers for locality of high priority
+ // candidate workers for locality of high priority
private Set<Integer> candidateWorkers = Sets.newHashSet();
public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
@@ -95,6 +97,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void init(Configuration conf) {
tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+ rpcParams = RpcParameterFactory.get(new TajoConf());
+
scheduledRequests = new ScheduledRequests();
minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY);
@@ -294,7 +298,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
ServiceTracker serviceTracker =
context.getMasterContext().getQueryMasterContext().getWorkerContext().getServiceTracker();
NettyClientBase tmClient = RpcClientManager.getInstance().
- getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+ getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true, rpcParams);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>();
@@ -310,7 +314,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
.setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue
masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack);
- NodeResourceResponse response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
for (AllocationResourceProto resource : response.getResourceList()) {
taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId()));
@@ -886,12 +890,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
totalAttempts++;
try {
- tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true,
+ rpcParams);
+
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture);
BatchAllocationResponse responseProto =
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
if (responseProto.getCancellationTaskCount() > 0) {
for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {
@@ -1004,12 +1010,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
AsyncRpcClient tajoWorkerRpc;
try {
- tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true,
+ rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture);
BatchAllocationResponse
- responseProto = callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
if(responseProto.getCancellationTaskCount() > 0) {
for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index cce9482..1b90080 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -45,6 +45,7 @@ import org.apache.tajo.master.event.QueryStopEvent;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.HistoryWriter.WriterFuture;
import org.apache.tajo.util.history.HistoryWriter.WriterHolder;
@@ -52,10 +53,7 @@ import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -91,6 +89,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
private RpcClientManager manager;
+ private Properties rpcClientParams;
+
private ExecutorService eventExecutor;
private ExecutorService singleEventExecutor;
@@ -105,6 +105,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
this.manager = RpcClientManager.getInstance();
+ this.rpcClientParams = RpcParameterFactory.get(this.systemConf);
querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
queryMasterContext = new QueryMasterContext(systemConf);
@@ -171,7 +172,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
// update master address in worker context.
ServiceTracker serviceTracker = workerContext.getServiceTracker();
- rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+ rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true,
+ rpcClientParams);
QueryCoordinatorProtocolService masterService = rpc.getStub();
CallFuture<WorkerConnectionsResponse> callBack = new CallFuture<WorkerConnectionsResponse>();
@@ -179,7 +181,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
WorkerConnectionsResponse connectionsProto =
- callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
return connectionsProto.getWorkerList();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -294,11 +296,11 @@ public class QueryMaster extends CompositeService implements EventHandler {
NettyClientBase tmClient;
try {
tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(),
- QueryCoordinatorProtocol.class, true);
+ QueryCoordinatorProtocol.class, true, rpcClientParams);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
- future.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Exception e) {
//this function will be closed in new thread.
//When tajo do stop cluster, tajo master maybe throw closed connection exception
@@ -404,7 +406,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(),
- QueryCoordinatorProtocol.class, true);
+ QueryCoordinatorProtocol.class, true, rpcClientParams);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(eachTask);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 46e48e6..bcfb938 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -55,6 +55,7 @@ import org.apache.tajo.session.Session;
import org.apache.tajo.storage.FormatProperty;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
import org.apache.tajo.worker.event.NodeResourceEvent;
@@ -95,6 +96,8 @@ public class QueryMasterTask extends CompositeService {
private TajoConf systemConf;
+ private Properties rpcParams;
+
private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
private volatile boolean isStopped;
@@ -131,8 +134,8 @@ public class QueryMasterTask extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
-
systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+ rpcParams = RpcParameterFactory.get(systemConf);
queryTaskContext = new QueryMasterTaskContext();
@@ -255,7 +258,8 @@ public class QueryMasterTask extends CompositeService {
InetSocketAddress workerAddress = getQuery().getStage(ebId).getAssignedWorkerMap().get(workerId);
try {
- tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true);
+ tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true,
+ rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>();
tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), callFuture);
@@ -472,7 +476,8 @@ public class QueryMasterTask extends CompositeService {
@Override
public void run() {
try {
- AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true);
+ AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true,
+ rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get());
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 282edcc..98ad292 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -62,6 +62,7 @@ import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
@@ -88,6 +89,8 @@ public class Stage implements EventHandler<StageEvent> {
private static final Log LOG = LogFactory.getLog(Stage.class);
+ private final Properties rpcParams;
+
private MasterPlan masterPlan;
private ExecutionBlock block;
private int priority;
@@ -300,6 +303,8 @@ public class Stage implements EventHandler<StageEvent> {
this.block = block;
this.eventHandler = context.getEventHandler();
+ this.rpcParams = RpcParameterFactory.get(context.getConf());
+
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
@@ -720,7 +725,7 @@ public class Stage implements EventHandler<StageEvent> {
public void run() {
try {
AsyncRpcClient tajoWorkerRpc =
- RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true);
+ RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.stopExecutionBlock(null,
requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class));
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java b/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java
new file mode 100644
index 0000000..6da4dac
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.util;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.rpc.RpcConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper class to get RPC Client Connection Parameters
+ */
+public class RpcParameterFactory {
+
+ static final Map<String, ConfVars> PROPERTIES_MAP = new HashMap<>();
+
+ static {
+ PROPERTIES_MAP.put(RpcConstants.CLIENT_RETRY_NUM, ConfVars.RPC_CLIENT_RETRY_NUM);
+ PROPERTIES_MAP.put(RpcConstants.CLIENT_CONNECTION_TIMEOUT, ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT);
+ PROPERTIES_MAP.put(RpcConstants.CLIENT_SOCKET_TIMEOUT, ConfVars.RPC_CLIENT_SOCKET_TIMEOUT);
+ PROPERTIES_MAP.put(RpcConstants.CLIENT_HANG_DETECTION, ConfVars.RPC_CLIENT_HANG_DETECTION_ENABLED);
+ }
+
+ public static Properties get(TajoConf conf) {
+ final Properties properties = new Properties();
+
+ for (Map.Entry<String, ConfVars> e : PROPERTIES_MAP.entrySet()) {
+ properties.put(e.getKey(), conf.getVar(e.getValue()));
+ }
+
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 94bf785..a3cc8fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -253,7 +253,7 @@ public class ExecutionBlockContext {
//If QueryMaster does not responding, current execution block should be stop
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
getStub().fatalError(callFuture.getController(), builder.build(), callFuture);
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Exception e) {
getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
.handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e));
@@ -300,7 +300,7 @@ public class ExecutionBlockContext {
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
return;
}
@@ -355,7 +355,7 @@ public class ExecutionBlockContext {
try {
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
- callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Throwable e) {
// can't send report to query master
LOG.fatal(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index bc4f9a1..a3b71e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -35,12 +35,14 @@ import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.event.NodeStatusEvent;
import java.net.ConnectException;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -55,7 +57,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class);
- private TajoConf tajoConf;
+ private TajoConf systemConf;
private StatusUpdaterThread updaterThread;
private volatile boolean isStopped;
private int heartBeatInterval;
@@ -63,6 +65,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
private final TajoWorker.WorkerContext workerContext;
private AsyncRpcClient rmClient;
+ private Properties rpcParams;
private ServiceTracker serviceTracker;
private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface resourceTracker;
private int queueingThreshold;
@@ -75,11 +78,12 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
@Override
public void serviceInit(Configuration conf) throws Exception {
- this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+ this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+ this.rpcParams = RpcParameterFactory.get(this.systemConf);
this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue();
- this.serviceTracker = ServiceTrackerFactory.get(tajoConf);
+ this.serviceTracker = ServiceTrackerFactory.get(systemConf);
this.workerContext.getNodeResourceManager().getDispatcher().register(NodeStatusEvent.EventType.class, this);
- this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL);
+ this.heartBeatInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL);
this.updaterThread = new StatusUpdaterThread();
this.updaterThread.setName("NodeStatusUpdater");
super.serviceInit(conf);
@@ -89,10 +93,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
public void serviceStart() throws Exception {
DefaultResourceCalculator calculator = new DefaultResourceCalculator();
int maxContainer = calculator.computeAvailableContainers(workerContext.getNodeResourceManager().getTotalResource(),
- NodeResources.createResource(tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1));
+ NodeResources.createResource(systemConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1));
// if resource changed over than 30%, send reports
- float queueingRate = tajoConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE);
+ float queueingRate = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE);
this.queueingThreshold = Math.max((int) Math.floor(maxContainer * queueingRate), 1);
LOG.info("Queueing threshold:" + queueingThreshold);
@@ -149,8 +153,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
RpcClientManager rpcManager = RpcClientManager.getInstance();
rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(),
- TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(),
- rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ TajoResourceTrackerProtocol.class, true, rpcParams);
return rmClient.getStub();
}
@@ -165,7 +168,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<NodeHeartbeatResponse>();
resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack);
- response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn(e.getMessage());
} catch (TimeoutException te) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index fbb8d54..607e7ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -152,11 +152,6 @@ public class TajoWorker extends CompositeService {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
RackResolver.init(systemConf);
- RpcClientManager rpcManager = RpcClientManager.getInstance();
- rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
- rpcManager.setTimeoutSeconds(
- systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
-
serviceTracker = ServiceTrackerFactory.get(systemConf);
this.workerContext = new TajoWorkerContext();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 42db852..a0b3f97 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -35,6 +35,7 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.event.*;
@@ -42,11 +43,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import static org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
-import static org.apache.tajo.ResourceProtos.ExecutionBlockContextRequest;
-import static org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import static org.apache.tajo.ResourceProtos.*;
/**
* A TaskManager is responsible for managing executionBlock resource and tasks.
@@ -58,6 +58,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap;
private final Dispatcher dispatcher;
private TaskExecutor executor;
+ private final Properties rpcParams;
public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){
this(dispatcher, workerContext, null);
@@ -70,6 +71,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
this.workerContext = workerContext;
this.executionBlockContextMap = Maps.newHashMap();
this.executor = executor;
+ this.rpcParams = RpcParameterFactory.get(this.workerContext.getConf());
}
@Override
@@ -118,13 +120,13 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
request.setExecutionBlockId(executionBlockId.getProto())
.setWorker(getWorkerContext().getConnectionInfo().getProto());
- client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true);
+ client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true, rpcParams);
QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<ExecutionBlockContextResponse>();
stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
ExecutionBlockContextResponse contextProto =
- callback.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ callback.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client);
context.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index f94bd78..ee428cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -26,6 +26,7 @@ import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.worker.TajoWorker;
/**
@@ -40,7 +41,8 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
RpcClientManager manager = RpcClientManager.getInstance();
ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf);
- NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+ NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(),
+ QueryCoordinatorProtocol.class, true, RpcParameterFactory.get(tajoConf));
masterClient.getStub();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-docs/src/main/sphinx/jdbc_driver.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/jdbc_driver.rst b/tajo-docs/src/main/sphinx/jdbc_driver.rst
index 6c7371b..176477a 100644
--- a/tajo-docs/src/main/sphinx/jdbc_driver.rst
+++ b/tajo-docs/src/main/sphinx/jdbc_driver.rst
@@ -42,6 +42,49 @@ In order to use the JDBC driver, you should add ``tajo-jdbc-x.y.z.jar`` in your
CLASSPATH=path/to/tajo-jdbc-x.y.z.jar:$CLASSPATH
+Connecting to the Tajo cluster instance
+=======================================
+A Tajo cluster is represented by a URL. Tajo JDBC driver can take the following URL forms:
+
+ * ``jdbc:tajo://host/``
+ * ``jdbc:tajo://host/database``
+ * ``jdbc:tajo://host:port/``
+ * ``jdbc:tajo://host:port/database``
+
+Each part of URL has the following meanings:
+
+ * ``host`` - The hostname of the TajoMaster. You can put hostname or ip address here.
+ * ``port`` - The port number that server is listening. Default port number is 26002.
+ * ``database`` - The database name. The default database name is ``default``.
+
+ To connect, you need to get ``Connection`` instance from Java JDBC Driver Manager as follows:
+
+.. code-block:: java
+
+ Connection db = DriverManager.getConnection(url);
+
+
+Connection Parameters
+=====================
+Connection parameters lets the JDBC Copnnection to enable or disable additional features. You should use ``java.util.Properties`` to pass your connection parameters into ``Connection``. The following example means that the transmission of ResultSet uses compression and its connection timeout is 15 seconds.
+
+.. code-block:: java
+
+ String url = "jdbc:tajo://localhost/test";
+ Properties props = new Properties();
+ props.setProperty("useCompression","true"); // use compression for ResultSet
+ props.setProperty("connectTimeout","15000"); // 15 seconds
+ Connection conn = DriverManager.getConnection(url, props);
+
+The connection parameters that Tajo currently supports are as follows:
+
+ * ``useCompression = bool`` - Enable compressed transfer for ResultSet.
+ * ``defaultRowFetchSize = int`` - Determine the number of rows fetched in ResultSet by one fetch with trip to the Server.
+ * ``connectTimeout = int (seconds)`` - The timeout value used for socket connect operations. If connecting to the server takes longer than this value, the connection is broken. The timeout is specified in seconds and a value of zero means that it is disabled.
+ * ``socketTimeout = int (seconds)`` - The timeout value used for socket read operations. If reading from the server takes longer than this value, the connection is closed. This can be used as both a brute force global query timeout and a method of detecting network problems. The timeout is specified in seconds and a value of zero means that it is disabled.
+ * ``retry = int`` - Number of retry operation. Tajo JDBC driver is resilient against some network or connection problems. It determines how many times the connection will retry.
+
+
An Example JDBC Client
=======================
http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/tablespaces.rst b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
index 964491c..79ea65f 100644
--- a/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
+++ b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
@@ -42,4 +42,4 @@ The following is an example for two tablespaces for hbase and hdfs:
.. note::
- Also, each tablespace can use different storage type. Please see :doc:`/storage_plugin` if you want to know more information about it.
\ No newline at end of file
+ Also, each tablespace can use different storage type. Please see :doc:`/storage_plugins` if you want to know more information about it.
\ No newline at end of file