You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/19 11:59:07 UTC

[flink] branch master updated: [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ac3ba  [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
e7ac3ba is described below

commit e7ac3ba7dfcb90c21025def2bf4112b108d21afd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 11:55:31 2018 +0200

    [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
    
    In order to reduce code duplication this commit replaces ZooKeeperStateHandleStore#
    getAllSortedByNameAndLock by getAllAndLock and do the sorting of the entries afterwards.
    The implication of this change is that we no longer try to release and remove corrupted
    entries and instead simply ignore them.
    
    This closes #6681.
---
 .../ZooKeeperCompletedCheckpointStore.java         |  8 ++-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 63 ----------------------
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  2 +-
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 21 ++------
 4 files changed, 12 insertions(+), 82 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index e443fc2..51f4008 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -69,6 +71,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
+	private static final Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> STRING_COMPARATOR = Comparator.comparing(o -> o.f1);
+
 	/** Curator ZooKeeper client. */
 	private final CuratorFramework client;
 
@@ -153,7 +157,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
 		while (true) {
 			try {
-				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock();
+				initialCheckpoints = checkpointsInZooKeeper.getAllAndLock();
 				break;
 			}
 			catch (ConcurrentModificationException e) {
@@ -161,6 +165,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			}
 		}
 
+		Collections.sort(initialCheckpoints, STRING_COMPARATOR);
+
 		int numberOfInitialCheckpoints = initialCheckpoints.size();
 
 		LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index b9cd0c1..2cb1ccc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -320,68 +319,6 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		return stateHandles;
 	}
 
-
-	/**
-	 * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the
-	 * respective state nodes. The result tuples contain the retrieved state and the path to the
-	 * node in ZooKeeper.
-	 *
-	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
-	 *
-	 * @return All state handles in ZooKeeper.
-	 * @throws Exception If a ZooKeeper or state handle operation fails
-	 */
-	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByNameAndLock() throws Exception {
-		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
-
-		boolean success = false;
-
-		retry:
-		while (!success) {
-			stateHandles.clear();
-
-			Stat stat = client.checkExists().forPath("/");
-			if (stat == null) {
-				break; // Node does not exist, done.
-			} else {
-				// Initial cVersion (number of changes to the children of this node)
-				int initialCVersion = stat.getCversion();
-
-				List<String> children = ZKPaths.getSortedChildren(
-						client.getZookeeperClient().getZooKeeper(),
-						ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
-				for (String path : children) {
-					path = "/" + path;
-
-					try {
-						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
-						stateHandles.add(new Tuple2<>(stateHandle, path));
-					} catch (KeeperException.NoNodeException ignored) {
-						// Concurrent deletion, retry
-						continue retry;
-					} catch (IOException ioException) {
-						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
-							"corrupted data. Releasing and trying to remove this node.", path, ioException);
-
-						releaseAndTryRemove(path);
-					}
-				}
-
-				int finalCVersion = client.checkExists().forPath("/").getCversion();
-
-				// Check for concurrent modifications
-				success = initialCVersion == finalCVersion;
-
-				// we don't have to release all locked nodes in case of a concurrent modification, because we
-				// will retrieve them in the next iteration again.
-			}
-		}
-
-		return stateHandles;
-	}
-
 	/**
 	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
 	 * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 1f7d369..e9b90b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -126,7 +126,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();
 
 		final int numCheckpointsToRetain = 1;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 2dd27e7..3c37fae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -36,6 +36,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -365,11 +367,12 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 			store.addAndLock(pathInZooKeeper, val);
 		}
 
-		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByNameAndLock();
+		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllAndLock();
 		assertEquals(expected.length, actual.size());
 
 		// bring the elements in sort order
 		Arrays.sort(expected);
+		Collections.sort(actual, Comparator.comparing(o -> o.f1));
 
 		for (int i = 0; i < expected.length; i++) {
 			assertEquals(expected[i], actual.get(i).f0.retrieveState());
@@ -468,22 +471,6 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		}
 
 		assertEquals(expected, actual);
-
-		// check the same for the all sorted by name call
-		allEntries = store.getAllSortedByNameAndLock();
-
-		actual.clear();
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-
-		Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 2);
-
-		// check that the corrupted node no longer exists
-		assertNull("The corrupted node should no longer exist.", stat);
 	}
 
 	/**