You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/29 00:48:28 UTC

incubator-apex-core git commit: APEXCORE-358 - Make RPC timeout configurable. Introduced "com.datatorrent.stram.rpc.*" system properties that may be used to set RPC timeouts.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 d272bfaa9 -> 21ad4cc9b


APEXCORE-358 - Make RPC timeout configurable. Introduced "com.datatorrent.stram.rpc.*" system properties that may be used to set RPC timeouts.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/21ad4cc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/21ad4cc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/21ad4cc9

Branch: refs/heads/release-3.3
Commit: 21ad4cc9b8e71789fc779d86f90cb0513fdf07bb
Parents: d272bfa
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Feb 28 13:43:00 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sun Feb 28 15:48:15 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/RecoverableRpcProxy.java  | 44 ++++++++++++++------
 .../stram/StreamingAppMasterService.java        |  2 +-
 .../datatorrent/stram/StramRecoveryTest.java    | 44 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
index fe4a86d..97fce63 100644
--- a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
+++ b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
@@ -29,6 +29,7 @@ import java.nio.charset.Charset;
 import static java.lang.Thread.sleep;
 
 import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.client.utils.URLEncodedUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,18 +52,28 @@ import java.util.List;
  */
 public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, Closeable
 {
-  private final static Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
+
+  public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout";
+  public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout";
+  public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout";
+
   public static final String QP_retryTimeoutMillis = "retryTimeoutMillis";
   public static final String QP_retryDelayMillis = "retryDelayMillis";
   public static final String QP_rpcTimeout = "rpcTimeout";
+
+  private static final int RETRY_TIMEOUT_DEFAULT = 30000;
+  private static final int RETRY_DELAY_DEFAULT = 10000;
+  private static final int RPC_TIMEOUT_DEFAULT = 5000;
+
   private final Configuration conf;
   private final String appPath;
   private StreamingContainerUmbilicalProtocol umbilical;
   private String lastConnectURI;
   private long lastCompletedCallTms;
-  private long retryTimeoutMillis = 30000;
-  private long retryDelayMillis = 10000;
-  private int rpcTimeout = 5000;
+  private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
+  private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
+  private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
 
   public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException
   {
@@ -169,15 +180,24 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler,
     }
   }
 
-  public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, int retryDelayMillis, int retryTimeoutMillis) throws Exception
+  public static URI toConnectURI(final InetSocketAddress address) throws Exception
+  {
+    int rpcTimeoutMillis = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
+    long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
+    long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
+    return toConnectURI(address, rpcTimeoutMillis, retryDelayMillis, retryTimeoutMillis);
+  }
+
+  public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, long retryDelayMillis, long retryTimeoutMillis) throws Exception
   {
-    StringBuilder query = new StringBuilder(256);
-    query.append(RecoverableRpcProxy.QP_rpcTimeout + '=').append(rpcTimeoutMillis);
-    query.append('&');
-    query.append(RecoverableRpcProxy.QP_retryDelayMillis + '=').append(retryDelayMillis);
-    query.append('&');
-    query.append(RecoverableRpcProxy.QP_retryTimeoutMillis + '=').append(retryTimeoutMillis);
-    return new URI("stram", null, address.getHostName(), address.getPort(), null, query.toString(), null);
+    return new URIBuilder()
+        .setScheme("stram")
+        .setHost(address.getHostName())
+        .setPort(address.getPort())
+        .setParameter(RecoverableRpcProxy.QP_rpcTimeout, Integer.toString(rpcTimeoutMillis))
+        .setParameter(RecoverableRpcProxy.QP_retryDelayMillis, Long.toString(retryDelayMillis))
+        .setParameter(RecoverableRpcProxy.QP_retryTimeoutMillis, Long.toString(retryTimeoutMillis))
+        .build();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index db8c255..8565275 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -555,7 +555,7 @@ public class StreamingAppMasterService extends CompositeService
 
     // write the connect address for containers to DFS
     InetSocketAddress connectAddress = NetUtils.getConnectAddress(this.heartbeatListener.getAddress());
-    URI connectUri = new URI("stram", null, connectAddress.getHostName(), connectAddress.getPort(), null, null, null);
+    URI connectUri = RecoverableRpcProxy.toConnectURI(connectAddress);
     FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), getConfig());
     recoveryHandler.writeConnectUri(connectUri.toString());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 6dbdcf0..75b4684 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -518,7 +518,51 @@ public class StramRecoveryTest
     Assert.assertTrue("timedout", timedout.get());
 
     rp.close();
+
+    String rpcTimeout = System.getProperty(RecoverableRpcProxy.RPC_TIMEOUT);
+    String rpcRetryDelay = System.getProperty(RecoverableRpcProxy.RETRY_DELAY);
+    String rpcRetryTimeout = System.getProperty(RecoverableRpcProxy.RETRY_TIMEOUT);
+
+    System.setProperty(RecoverableRpcProxy.RPC_TIMEOUT, Integer.toString(500));
+    System.setProperty(RecoverableRpcProxy.RETRY_DELAY, Long.toString(100));
+    System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(500));
+
+    timedout.set(false);
+    uri = RecoverableRpcProxy.toConnectURI(address);
+    recoveryHandler.writeConnectUri(uri.toString());
+
+    rp = new RecoverableRpcProxy(appPath, conf);
+    protocolProxy = rp.getProxy();
+    protocolProxy.log("containerId", "msg");
+    try {
+      protocolProxy.log("containerId", "timeout");
+      Assert.fail("expected socket timeout");
+    } catch (java.net.SocketTimeoutException e) {
+      // expected
+    }
+    Assert.assertTrue("timedout", timedout.get());
+    rp.close();
+
+    timedout.set(false);
+    System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(1500));
+
+    uri = RecoverableRpcProxy.toConnectURI(address);
+    recoveryHandler.writeConnectUri(uri.toString());
+
+    protocolProxy.log("containerId", "timeout");
+    Assert.assertTrue("timedout", timedout.get());
+
+
+    restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);
+    restoreSystemProperty(RecoverableRpcProxy.RETRY_DELAY, rpcRetryDelay);
+    restoreSystemProperty(RecoverableRpcProxy.RETRY_TIMEOUT, rpcRetryTimeout);
+
     server.stop();
   }
 
+  private static String restoreSystemProperty(final String key, final String value)
+  {
+    return (value == null)? System.clearProperty(key) : System.setProperty(key, value);
+  }
+
 }