You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/06/25 23:33:45 UTC

[incubator-heron] branch master updated: Retry tunnel setup when it fails (#3299)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c09b65  Retry tunnel setup when it fails (#3299)
5c09b65 is described below

commit 5c09b65712148bf1a42c35bd22930ba2ba5ac3c1
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Jun 25 16:33:40 2019 -0700

    Retry tunnel setup when it fails (#3299)
    
    * Retry tunnel setup when it fails
    
    * add delay between retries
    
    * fix bug
---
 .../zookeeper/curator/CuratorStateManager.java     | 34 +++++++++++++++-------
 1 file changed, 24 insertions(+), 10 deletions(-)

diff --git a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index fe097e5..78129cc 100644
--- a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -63,6 +63,8 @@ import org.apache.zookeeper.Watcher;
 
 public class CuratorStateManager extends FileSystemStateManager {
   private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
+  private static final int TUNNEL_SETUP_RETRY = 0;  // 0 means no retry
+  private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;
 
   private CuratorFramework client;
   private String connectionString;
@@ -83,17 +85,29 @@ public class CuratorStateManager extends FileSystemStateManager {
         NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);
 
     if (tunnelConfig.isTunnelNeeded()) {
-      Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
-
-      String newConnectionString = tunneledResults.first;
-      if (newConnectionString.isEmpty()) {
-        throw new IllegalArgumentException("Failed to connect to tunnel host '"
-            + tunnelConfig.getTunnelHost() + "'");
+      for (int setupCount = 0;; ++setupCount) {
+        Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+        String newConnectionString = tunneledResults.first;
+
+        // If tunnel can't be setup correctly. Retry or bail.
+        if (!newConnectionString.isEmpty()) {
+          // Success, use the new connection string
+          connectionString = newConnectionString;
+          tunnelProcesses.addAll(tunneledResults.second);
+          break;
+        } else {
+          if (setupCount < TUNNEL_SETUP_RETRY) {
+            try {
+              TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            throw new IllegalArgumentException("Failed to connect to tunnel host '"
+                + tunnelConfig.getTunnelHost() + "'");
+          }
+        }
       }
-
-      // Use the new connection string
-      connectionString = newConnectionString;
-      tunnelProcesses.addAll(tunneledResults.second);
     }
 
     // Start it