You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/20 13:35:33 UTC
git commit: Fail fast when Zookeeper connections expire
Updated Branches:
refs/heads/S4-85 [created] db7a8908d
Fail fast when Zookeeper connections expire
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/db7a8908
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/db7a8908
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/db7a8908
Branch: refs/heads/S4-85
Commit: db7a8908d758be67c045bd5b7b6191062c70bdc8
Parents: 6f8ad92
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 20 15:28:31 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 20 15:28:31 2012 +0200
----------------------------------------------------------------------
.../apache/s4/comm/topology/AssignmentFromZK.java | 10 +-
.../org/apache/s4/comm/topology/ClusterFromZK.java | 7 ++-
.../apache/s4/comm/topology/ClustersFromZK.java | 9 +-
.../org/apache/s4/comm/topology/RemoteStreams.java | 5 +
.../s4/comm/topology/AssignmentsFromZKTest.java | 68 ---------------
.../s4/comm/topology/AssignmentsFromZKTest1.java | 65 ++++++++++++++
.../s4/comm/topology/AssignmentsFromZKTest2.java | 23 +++++
7 files changed, 109 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 3a198bb..66c2eca 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -118,11 +118,11 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.close();
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 82e7a5e..1ec3cb5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -186,7 +186,12 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
@Override
public void handleStateChanged(KeeperState state) throws Exception {
- // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterRef.get().toString());
+ System.exit(1);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index c5445d3..f177599 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -78,10 +78,11 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index dd93682..587f823 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -114,6 +114,11 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
+ if (state.equals(KeeperState.Expired)) {
+ logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+ System.exit(1);
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
deleted file mode 100644
index 673c7a3..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.fixtures.CommTestUtils;
-import org.junit.Test;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Sets;
-
-public class AssignmentsFromZKTest extends ZKBaseTest {
-
- @Test
- public void testAssignmentFor1Cluster() throws Exception {
- TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
- final String topologyNames = "cluster1";
- testAssignment(taskSetup, topologyNames);
- }
-
- @Test
- public void testAssignmentFor2Clusters() throws Exception {
- Thread.sleep(2000);
- TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
- final String topologyNames = "cluster2, cluster3";
- testAssignment(taskSetup, topologyNames);
- }
-
- private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
- final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
- taskSetup.clean("s4");
- for (String topologyName : names) {
- taskSetup.setup(topologyName, 10, 1300);
- }
-
- final CountDownLatch latch = new CountDownLatch(10 * names.size());
- for (int i = 0; i < 10; i++) {
- Runnable runnable = new Runnable() {
-
- @SuppressWarnings("unused")
- @Override
- public void run() {
- AssignmentFromZK assignmentFromZK;
- try {
-
- for (String topologyName : names) {
- assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
- assignmentFromZK.init();
- ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
- latch.countDown();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- Thread t = new Thread(runnable);
- t.start();
- }
-
- boolean await = latch.await(30, TimeUnit.SECONDS);
- assertEquals(true, await);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
new file mode 100644
index 0000000..8e4c70a
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -0,0 +1,65 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+/**
+ * Separated from AssignmentsFromZKTest2 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest1 extends ZKBaseTest {
+
+ @Test
+ public void testAssignmentFor1Cluster() throws Exception {
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster1";
+ testAssignment(taskSetup, topologyNames);
+ }
+
+ public static void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+ final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
+ taskSetup.clean("s4");
+ for (String topologyName : names) {
+ taskSetup.setup(topologyName, 10, 1300);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(10 * names.size());
+ for (int i = 0; i < 10; i++) {
+ Runnable runnable = new Runnable() {
+
+ @SuppressWarnings("unused")
+ @Override
+ public void run() {
+ AssignmentFromZK assignmentFromZK;
+ try {
+
+ for (String topologyName : names) {
+ assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ assignmentFromZK.init();
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+ latch.countDown();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ boolean await = latch.await(30, TimeUnit.SECONDS);
+ assertEquals(true, await);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/db7a8908/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
new file mode 100644
index 0000000..8c51c39
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.topology;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+/**
+ * Separated from AssignmentsFromZKTest1 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest2 extends ZkBasedTest {
+
+ @Test
+ public void testAssignmentFor2Clusters() throws Exception {
+ Thread.sleep(2000);
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster2, cluster3";
+ AssignmentsFromZKTest1.testAssignment(taskSetup, topologyNames);
+ }
+
+}