You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/29 00:48:28 UTC
incubator-apex-core git commit: APEXCORE-358 - Make RPC timeout
configurable. Introduced "com.datatorrent.stram.rpc.*" system properties that
may be used to set RPC timeouts.
Repository: incubator-apex-core
Updated Branches:
refs/heads/release-3.3 d272bfaa9 -> 21ad4cc9b
APEXCORE-358 - Make RPC timeout configurable. Introduced "com.datatorrent.stram.rpc.*" system properties that may be used to set RPC timeouts.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/21ad4cc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/21ad4cc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/21ad4cc9
Branch: refs/heads/release-3.3
Commit: 21ad4cc9b8e71789fc779d86f90cb0513fdf07bb
Parents: d272bfa
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Feb 28 13:43:00 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sun Feb 28 15:48:15 2016 -0800
----------------------------------------------------------------------
.../datatorrent/stram/RecoverableRpcProxy.java | 44 ++++++++++++++------
.../stram/StreamingAppMasterService.java | 2 +-
.../datatorrent/stram/StramRecoveryTest.java | 44 ++++++++++++++++++++
3 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
index fe4a86d..97fce63 100644
--- a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
+++ b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
@@ -29,6 +29,7 @@ import java.nio.charset.Charset;
import static java.lang.Thread.sleep;
import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,18 +52,28 @@ import java.util.List;
*/
public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, Closeable
{
- private final static Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
+
+ public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout";
+ public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout";
+ public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout";
+
public static final String QP_retryTimeoutMillis = "retryTimeoutMillis";
public static final String QP_retryDelayMillis = "retryDelayMillis";
public static final String QP_rpcTimeout = "rpcTimeout";
+
+ private static final int RETRY_TIMEOUT_DEFAULT = 30000;
+ private static final int RETRY_DELAY_DEFAULT = 10000;
+ private static final int RPC_TIMEOUT_DEFAULT = 5000;
+
private final Configuration conf;
private final String appPath;
private StreamingContainerUmbilicalProtocol umbilical;
private String lastConnectURI;
private long lastCompletedCallTms;
- private long retryTimeoutMillis = 30000;
- private long retryDelayMillis = 10000;
- private int rpcTimeout = 5000;
+ private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
+ private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
+ private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException
{
@@ -169,15 +180,24 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler,
}
}
- public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, int retryDelayMillis, int retryTimeoutMillis) throws Exception
+ public static URI toConnectURI(final InetSocketAddress address) throws Exception
+ {
+ int rpcTimeoutMillis = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
+ long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
+ long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
+ return toConnectURI(address, rpcTimeoutMillis, retryDelayMillis, retryTimeoutMillis);
+ }
+
+ public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, long retryDelayMillis, long retryTimeoutMillis) throws Exception
{
- StringBuilder query = new StringBuilder(256);
- query.append(RecoverableRpcProxy.QP_rpcTimeout + '=').append(rpcTimeoutMillis);
- query.append('&');
- query.append(RecoverableRpcProxy.QP_retryDelayMillis + '=').append(retryDelayMillis);
- query.append('&');
- query.append(RecoverableRpcProxy.QP_retryTimeoutMillis + '=').append(retryTimeoutMillis);
- return new URI("stram", null, address.getHostName(), address.getPort(), null, query.toString(), null);
+ return new URIBuilder()
+ .setScheme("stram")
+ .setHost(address.getHostName())
+ .setPort(address.getPort())
+ .setParameter(RecoverableRpcProxy.QP_rpcTimeout, Integer.toString(rpcTimeoutMillis))
+ .setParameter(RecoverableRpcProxy.QP_retryDelayMillis, Long.toString(retryDelayMillis))
+ .setParameter(RecoverableRpcProxy.QP_retryTimeoutMillis, Long.toString(retryTimeoutMillis))
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index db8c255..8565275 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -555,7 +555,7 @@ public class StreamingAppMasterService extends CompositeService
// write the connect address for containers to DFS
InetSocketAddress connectAddress = NetUtils.getConnectAddress(this.heartbeatListener.getAddress());
- URI connectUri = new URI("stram", null, connectAddress.getHostName(), connectAddress.getPort(), null, null, null);
+ URI connectUri = RecoverableRpcProxy.toConnectURI(connectAddress);
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), getConfig());
recoveryHandler.writeConnectUri(connectUri.toString());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 6dbdcf0..75b4684 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -518,7 +518,51 @@ public class StramRecoveryTest
Assert.assertTrue("timedout", timedout.get());
rp.close();
+
+ String rpcTimeout = System.getProperty(RecoverableRpcProxy.RPC_TIMEOUT);
+ String rpcRetryDelay = System.getProperty(RecoverableRpcProxy.RETRY_DELAY);
+ String rpcRetryTimeout = System.getProperty(RecoverableRpcProxy.RETRY_TIMEOUT);
+
+ System.setProperty(RecoverableRpcProxy.RPC_TIMEOUT, Integer.toString(500));
+ System.setProperty(RecoverableRpcProxy.RETRY_DELAY, Long.toString(100));
+ System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(500));
+
+ timedout.set(false);
+ uri = RecoverableRpcProxy.toConnectURI(address);
+ recoveryHandler.writeConnectUri(uri.toString());
+
+ rp = new RecoverableRpcProxy(appPath, conf);
+ protocolProxy = rp.getProxy();
+ protocolProxy.log("containerId", "msg");
+ try {
+ protocolProxy.log("containerId", "timeout");
+ Assert.fail("expected socket timeout");
+ } catch (java.net.SocketTimeoutException e) {
+ // expected
+ }
+ Assert.assertTrue("timedout", timedout.get());
+ rp.close();
+
+ timedout.set(false);
+ System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(1500));
+
+ uri = RecoverableRpcProxy.toConnectURI(address);
+ recoveryHandler.writeConnectUri(uri.toString());
+
+ protocolProxy.log("containerId", "timeout");
+ Assert.assertTrue("timedout", timedout.get());
+
+
+ restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);
+ restoreSystemProperty(RecoverableRpcProxy.RETRY_DELAY, rpcRetryDelay);
+ restoreSystemProperty(RecoverableRpcProxy.RETRY_TIMEOUT, rpcRetryTimeout);
+
server.stop();
}
+ private static String restoreSystemProperty(final String key, final String value)
+ {
+ return (value == null)? System.clearProperty(key) : System.setProperty(key, value);
+ }
+
}