You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/05 09:40:04 UTC

[flink] branch release-1.10 updated: [FLINK-18677][fix] Added handling of suspended or lost connections within the ZooKeeperLeaderRetrievalService.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new d954278  [FLINK-18677][fix] Added handling of suspended or lost connections within the ZooKeeperLeaderRetrievalService.
d954278 is described below

commit d954278bb1dc1672fedcd2de745bfe8f5bad6e34
Author: xcomp <ma...@ververica.com>
AuthorDate: Thu Jul 30 21:04:01 2020 +0200

    [FLINK-18677][fix] Added handling of suspended or lost connections within the ZooKeeperLeaderRetrievalService.
    
    The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.
    
    [FLINK-18677][runtime] [style] Replaced spaces by TABs to follow the Apache Flink code styles.
    
    [FLINK-18677][runtime] [fix] Synchronize notifyLeaderLoss through lock and removed redundant code.
    
    The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID).
    
    [FLINK-18677][runtime] The exception was added to make it more explicit that the method is not expected to be called.
    
    [FLINK-18677][runtime] Decreased wait time the queue to be filled since we're not expecting any objects.
    
    The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test.
    
    [FLINK-18677][runtime] Added infinite wait time to happy test.
    
    The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on.
    
    [FLINK-18677][runtime] Moved log messages out of synchronization blocks.
    
    This closes #13055.
---
 .../ZooKeeperLeaderRetrievalService.java           |  48 ++++--
 ...KeeperLeaderElectionConnectionHandlingTest.java | 170 +++++++++++++++++++++
 2 files changed, 205 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index fd0ea82..9faf5c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -31,6 +31,8 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -164,17 +166,7 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 						}
 					}
 
-					if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
-						Objects.equals(leaderSessionID, lastLeaderSessionID))) {
-						LOG.debug(
-							"New leader information: Leader={}, session ID={}.",
-							leaderAddress,
-							leaderSessionID);
-
-						lastLeaderAddress = leaderAddress;
-						lastLeaderSessionID = leaderSessionID;
-						leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
-					}
+					notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
 				} catch (Exception e) {
 					leaderListener.handleError(new Exception("Could not handle node changed event.", e));
 					throw e;
@@ -192,14 +184,20 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 				break;
 			case SUSPENDED:
 				LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " +
-					"ZooKeeper.");
+						"ZooKeeper.");
+				synchronized (lock) {
+					notifyLeaderLoss();
+				}
 				break;
 			case RECONNECTED:
 				LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
 				break;
 			case LOST:
 				LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
-					"ZooKeeper.");
+						"ZooKeeper.");
+				synchronized (lock) {
+					notifyLeaderLoss();
+				}
 				break;
 		}
 	}
@@ -208,4 +206,28 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
 	public void unhandledError(String s, Throwable throwable) {
 		leaderListener.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable));
 	}
+
+	@GuardedBy("lock")
+	private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
+		if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) &&
+				Objects.equals(newLeaderSessionID, lastLeaderSessionID))) {
+			if (newLeaderAddress == null && newLeaderSessionID == null) {
+				LOG.debug("Leader information was lost: The listener will be notified accordingly.");
+			} else {
+				LOG.debug(
+						"New leader information: Leader={}, session ID={}.",
+						newLeaderAddress,
+						newLeaderSessionID);
+			}
+
+			lastLeaderAddress = newLeaderAddress;
+			lastLeaderSessionID = newLeaderSessionID;
+			leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
+		}
+	}
+
+	@GuardedBy("lock")
+	private void notifyLeaderLoss() {
+		notifyIfNewLeaderAddress(null, null);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
new file mode 100644
index 0000000..ac3f223
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the error handling in case of a suspended connection to the ZooKeeper instance.
+ */
+public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger {
+
+	private TestingServer testingServer;
+
+	private Configuration config;
+
+	private CuratorFramework zooKeeperClient;
+
+	@Before
+	public void before() throws Exception {
+		testingServer = new TestingServer();
+
+		config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+
+		zooKeeperClient = ZooKeeperUtils.startCuratorFramework(config);
+	}
+
+	@After
+	public void after() throws Exception {
+		stopTestServer();
+
+		if (zooKeeperClient != null) {
+			zooKeeperClient.close();
+			zooKeeperClient = null;
+		}
+	}
+
+	@Test
+	public void testConnectionSuspendedHandlingDuringInitialization() throws Exception {
+		QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1, Duration.ofMillis(50));
+
+		ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config);
+		testInstance.start(queueLeaderElectionListener);
+
+		// do the testing
+		CompletableFuture<String> firstAddress = queueLeaderElectionListener.next();
+		assertThat("No results are expected, yet, since no leader was elected.", firstAddress, is(nullValue()));
+
+		stopTestServer();
+
+		CompletableFuture<String> secondAddress = queueLeaderElectionListener.next();
+		assertThat("No result is expected since there was no leader elected before stopping the server, yet.", secondAddress, is(nullValue()));
+	}
+
+	@Test
+	public void testConnectionSuspendedHandling() throws Exception {
+		String leaderAddress = "localhost";
+		LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(zooKeeperClient, config);
+		TestingContender contender = new TestingContender(leaderAddress, leaderElectionService);
+		leaderElectionService.start(contender);
+
+		QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);
+
+		ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config);
+		testInstance.start(queueLeaderElectionListener);
+
+		// do the testing
+		CompletableFuture<String> firstAddress = queueLeaderElectionListener.next();
+		assertThat("The first result is expected to be the initially set leader address.", firstAddress.get(), is(leaderAddress));
+
+		stopTestServer();
+
+		CompletableFuture<String> secondAddress = queueLeaderElectionListener.next();
+		assertThat("The next result must not be missing.", secondAddress, is(notNullValue()));
+		assertThat("The next result is expected to be null.", secondAddress.get(), is(nullValue()));
+	}
+
+	private void stopTestServer() throws IOException {
+		if (testingServer != null) {
+			testingServer.stop();
+			testingServer = null;
+		}
+	}
+
+	private static class QueueLeaderElectionListener implements LeaderRetrievalListener {
+
+		private final BlockingQueue<CompletableFuture<String>> queue;
+		private final Duration timeout;
+
+		public QueueLeaderElectionListener(int expectedCalls) {
+			this(expectedCalls, null);
+		}
+
+		public QueueLeaderElectionListener(int expectedCalls, Duration timeout) {
+			this.queue = new ArrayBlockingQueue<>(expectedCalls);
+			this.timeout = timeout;
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			try {
+				if (timeout == null) {
+					queue.put(CompletableFuture.completedFuture(leaderAddress));
+				} else {
+					queue.offer(CompletableFuture.completedFuture(leaderAddress), timeout.toMillis(), TimeUnit.MILLISECONDS);
+				}
+			} catch (InterruptedException e) {
+				throw new IllegalStateException(e);
+			}
+		}
+
+		public CompletableFuture<String> next() {
+			try {
+				if (timeout == null) {
+					return queue.take();
+				} else {
+					return this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+				}
+			} catch (InterruptedException e) {
+				throw new IllegalStateException(e);
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			throw new UnsupportedOperationException("handleError(Exception) shouldn't have been called, but it was triggered anyway.", exception);
+		}
+	}
+}