You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/06/25 23:33:45 UTC
[incubator-heron] branch master updated: Retry tunnel setup when it
fails (#3299)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5c09b65 Retry tunnel setup when it fails (#3299)
5c09b65 is described below
commit 5c09b65712148bf1a42c35bd22930ba2ba5ac3c1
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Jun 25 16:33:40 2019 -0700
Retry tunnel setup when it fails (#3299)
* Retry tunnel setup when it fails
* add delay between retries
* fix bug
---
.../zookeeper/curator/CuratorStateManager.java | 34 +++++++++++++++-------
1 file changed, 24 insertions(+), 10 deletions(-)
diff --git a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index fe097e5..78129cc 100644
--- a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -63,6 +63,8 @@ import org.apache.zookeeper.Watcher;
public class CuratorStateManager extends FileSystemStateManager {
private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
+ private static final int TUNNEL_SETUP_RETRY = 0; // 0 means no retry
+ private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;
private CuratorFramework client;
private String connectionString;
@@ -83,17 +85,29 @@ public class CuratorStateManager extends FileSystemStateManager {
NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);
if (tunnelConfig.isTunnelNeeded()) {
- Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
-
- String newConnectionString = tunneledResults.first;
- if (newConnectionString.isEmpty()) {
- throw new IllegalArgumentException("Failed to connect to tunnel host '"
- + tunnelConfig.getTunnelHost() + "'");
+ for (int setupCount = 0;; ++setupCount) {
+ Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+ String newConnectionString = tunneledResults.first;
+
+ // If tunnel can't be setup correctly. Retry or bail.
+ if (!newConnectionString.isEmpty()) {
+ // Success, use the new connection string
+ connectionString = newConnectionString;
+ tunnelProcesses.addAll(tunneledResults.second);
+ break;
+ } else {
+ if (setupCount < TUNNEL_SETUP_RETRY) {
+ try {
+ TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ throw new IllegalArgumentException("Failed to connect to tunnel host '"
+ + tunnelConfig.getTunnelHost() + "'");
+ }
+ }
}
-
- // Use the new connection string
- connectionString = newConnectionString;
- tunnelProcesses.addAll(tunneledResults.second);
}
// Start it