You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/20 13:35:33 UTC

git commit: Fail fast when Zookeeper connections expire

Updated Branches:
  refs/heads/S4-85 [created] db7a8908d


Fail fast when Zookeeper connections expire


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/db7a8908
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/db7a8908
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/db7a8908

Branch: refs/heads/S4-85
Commit: db7a8908d758be67c045bd5b7b6191062c70bdc8
Parents: 6f8ad92
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 20 15:28:31 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 20 15:28:31 2012 +0200

----------------------------------------------------------------------
 .../apache/s4/comm/topology/AssignmentFromZK.java  |   10 +-
 .../org/apache/s4/comm/topology/ClusterFromZK.java |    7 ++-
 .../apache/s4/comm/topology/ClustersFromZK.java    |    9 +-
 .../org/apache/s4/comm/topology/RemoteStreams.java |    5 +
 .../s4/comm/topology/AssignmentsFromZKTest.java    |   68 ---------------
 .../s4/comm/topology/AssignmentsFromZKTest1.java   |   65 ++++++++++++++
 .../s4/comm/topology/AssignmentsFromZKTest2.java   |   23 +++++
 7 files changed, 109 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 3a198bb..66c2eca 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -118,11 +118,11 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
-        if (!state.equals(KeeperState.SyncConnected)) {
-            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
-            zkClient.close();
-            zkClient.connect(connectionTimeout, null);
-            handleNewSession();
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterName);
+            System.exit(1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 82e7a5e..1ec3cb5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -186,7 +186,12 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
 
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
-        // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterRef.get().toString());
+            System.exit(1);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index c5445d3..f177599 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -78,10 +78,11 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
-        if (!state.equals(KeeperState.SyncConnected)) {
-            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
-            zkClient.connect(connectionTimeout, null);
-            handleNewSession();
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterName);
+            System.exit(1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index dd93682..587f823 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -114,6 +114,11 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
+        if (state.equals(KeeperState.Expired)) {
+            logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+            System.exit(1);
+        }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
deleted file mode 100644
index 673c7a3..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.fixtures.CommTestUtils;
-import org.junit.Test;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Sets;
-
-public class AssignmentsFromZKTest extends ZKBaseTest {
-
-    @Test
-    public void testAssignmentFor1Cluster() throws Exception {
-        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
-        final String topologyNames = "cluster1";
-        testAssignment(taskSetup, topologyNames);
-    }
-
-    @Test
-    public void testAssignmentFor2Clusters() throws Exception {
-        Thread.sleep(2000);
-        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
-        final String topologyNames = "cluster2, cluster3";
-        testAssignment(taskSetup, topologyNames);
-    }
-
-    private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
-        final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
-        taskSetup.clean("s4");
-        for (String topologyName : names) {
-            taskSetup.setup(topologyName, 10, 1300);
-        }
-
-        final CountDownLatch latch = new CountDownLatch(10 * names.size());
-        for (int i = 0; i < 10; i++) {
-            Runnable runnable = new Runnable() {
-
-                @SuppressWarnings("unused")
-                @Override
-                public void run() {
-                    AssignmentFromZK assignmentFromZK;
-                    try {
-
-                        for (String topologyName : names) {
-                            assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
-                            assignmentFromZK.init();
-                            ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
-                            latch.countDown();
-                        }
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            };
-            Thread t = new Thread(runnable);
-            t.start();
-        }
-
-        boolean await = latch.await(30, TimeUnit.SECONDS);
-        assertEquals(true, await);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
new file mode 100644
index 0000000..8e4c70a
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -0,0 +1,65 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+/**
+ * Separated from AssignmentsFromZKTest2 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ * 
+ */
+public class AssignmentsFromZKTest1 extends ZKBaseTest {
+
+    @Test
+    public void testAssignmentFor1Cluster() throws Exception {
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String topologyNames = "cluster1";
+        testAssignment(taskSetup, topologyNames);
+    }
+
+    public static void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+        final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
+        taskSetup.clean("s4");
+        for (String topologyName : names) {
+            taskSetup.setup(topologyName, 10, 1300);
+        }
+
+        final CountDownLatch latch = new CountDownLatch(10 * names.size());
+        for (int i = 0; i < 10; i++) {
+            Runnable runnable = new Runnable() {
+
+                @SuppressWarnings("unused")
+                @Override
+                public void run() {
+                    AssignmentFromZK assignmentFromZK;
+                    try {
+
+                        for (String topologyName : names) {
+                            assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            assignmentFromZK.init();
+                            ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+                            latch.countDown();
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        boolean await = latch.await(30, TimeUnit.SECONDS);
+        assertEquals(true, await);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
new file mode 100644
index 0000000..8c51c39
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.topology;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+/**
+ * Separated from AssignmentsFromZKTest1 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ * 
+ */
+public class AssignmentsFromZKTest2 extends ZkBasedTest {
+
+    @Test
+    public void testAssignmentFor2Clusters() throws Exception {
+        Thread.sleep(2000);
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String topologyNames = "cluster2, cluster3";
+        AssignmentsFromZKTest1.testAssignment(taskSetup, topologyNames);
+    }
+
+}