You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/19 12:00:01 UTC

[GitHub] tillrohrmann closed pull request #6681: [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock

tillrohrmann closed pull request #6681: [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
URL: https://github.com/apache/flink/pull/6681
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index e443fc21552..51f40085c3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,8 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -69,6 +71,8 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
+	private static final Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> STRING_COMPARATOR = Comparator.comparing(o -> o.f1);
+
 	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
@@ -153,7 +157,7 @@ public void recover() throws Exception {
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
 		while (true) {
 			try {
-				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock();
+				initialCheckpoints = checkpointsInZooKeeper.getAllAndLock();
 				break;
 			}
 			catch (ConcurrentModificationException e) {
@@ -161,6 +165,8 @@ public void recover() throws Exception {
 			}
 		}
 
+		Collections.sort(initialCheckpoints, STRING_COMPARATOR);
+
 		int numberOfInitialCheckpoints = initialCheckpoints.size();
 
 		LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index b9cd0c1b720..2cb1ccc5071 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -24,7 +24,6 @@
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -320,68 +319,6 @@ public int exists(String pathInZooKeeper) throws Exception {
 		return stateHandles;
 	}
 
-
-	/**
-	 * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the
-	 * respective state nodes. The result tuples contain the retrieved state and the path to the
-	 * node in ZooKeeper.
-	 *
-	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
-	 *
-	 * @return All state handles in ZooKeeper.
-	 * @throws Exception If a ZooKeeper or state handle operation fails
-	 */
-	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByNameAndLock() throws Exception {
-		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
-
-		boolean success = false;
-
-		retry:
-		while (!success) {
-			stateHandles.clear();
-
-			Stat stat = client.checkExists().forPath("/");
-			if (stat == null) {
-				break; // Node does not exist, done.
-			} else {
-				// Initial cVersion (number of changes to the children of this node)
-				int initialCVersion = stat.getCversion();
-
-				List<String> children = ZKPaths.getSortedChildren(
-						client.getZookeeperClient().getZooKeeper(),
-						ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
-				for (String path : children) {
-					path = "/" + path;
-
-					try {
-						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
-						stateHandles.add(new Tuple2<>(stateHandle, path));
-					} catch (KeeperException.NoNodeException ignored) {
-						// Concurrent deletion, retry
-						continue retry;
-					} catch (IOException ioException) {
-						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
-							"corrupted data. Releasing and trying to remove this node.", path, ioException);
-
-						releaseAndTryRemove(path);
-					}
-				}
-
-				int finalCVersion = client.checkExists().forPath("/").getCversion();
-
-				// Check for concurrent modifications
-				success = initialCVersion == finalCVersion;
-
-				// we don't have to release all locked nodes in case of a concurrent modification, because we
-				// will retrieve them in the next iteration again.
-			}
-		}
-
-		return stateHandles;
-	}
-
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
 	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 1f7d3691e50..e9b90b72611 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -126,7 +126,7 @@ public void testCheckpointRecovery() throws Exception {
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();
 
 		final int numCheckpointsToRetain = 1;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 2dd27e7c897..3c37faef3aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -36,6 +36,8 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -365,11 +367,12 @@ public void testGetAllSortedByName() throws Exception {
 			store.addAndLock(pathInZooKeeper, val);
 		}
 
-		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByNameAndLock();
+		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllAndLock();
 		assertEquals(expected.length, actual.size());
 
 		// bring the elements in sort order
 		Arrays.sort(expected);
+		Collections.sort(actual, Comparator.comparing(o -> o.f1));
 
 		for (int i = 0; i < expected.length; i++) {
 			assertEquals(expected[i], actual.get(i).f0.retrieveState());
@@ -468,22 +471,6 @@ public void testCorruptedData() throws Exception {
 		}
 
 		assertEquals(expected, actual);
-
-		// check the same for the all sorted by name call
-		allEntries = store.getAllSortedByNameAndLock();
-
-		actual.clear();
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-
-		Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 2);
-
-		// check that the corrupted node no longer exists
-		assertNull("The corrupted node should no longer exist.", stat);
 	}
 
 	/**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services