You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/14 08:14:34 UTC

flink git commit: [FLINK-5942] [checkpoint] Harden ZooKeeperStateHandleStore to handle corrupt data

Repository: flink
Updated Branches:
  refs/heads/master 14c1941d8 -> ffb8d0518


[FLINK-5942] [checkpoint] Harden ZooKeeperStateHandleStore to handle corrupt data

If calling ZooKeeperStateHandleStore.getAll or getAllSortedByName as the
ZooKeeperCompletedCheckpointStore does in the recovery case, the operation will fail
if there exists a Znode with corrupted data. This will break Flink's recovery
mechanism, because it will read this node over and over again. In order to solve this
problem, this commit changes the behaviour such that it ignores corrupted Znodes it
cannot read.

This closes #3447.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffb8d051
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffb8d051
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffb8d051

Branch: refs/heads/master
Commit: ffb8d0518394847ac60252f2140d50a4fd1f0b68
Parents: 14c1941
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 1 18:03:41 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Mar 14 09:14:05 2017 +0100

----------------------------------------------------------------------
 .../zookeeper/ZooKeeperStateHandleStore.java    | 12 ++++-
 .../ZooKeeperStateHandleStoreITCase.java        | 51 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffb8d051/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dd32efb..364ba0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -240,7 +240,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		try {
 			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
 		} catch (IOException | ClassNotFoundException e) {
-			throw new Exception("Failed to deserialize state handle from ZooKeeper data from " +
+			throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
 				pathInZooKeeper + '.', e);
 		}
 	}
@@ -285,6 +285,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 		retry:
 		while (!success) {
+			stateHandles.clear();
+
 			Stat stat = client.checkExists().forPath("/");
 			if (stat == null) {
 				break; // Node does not exist, done.
@@ -303,6 +305,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
 						continue retry;
+					} catch (IOException ioException) {
+						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
+							"corrupted data. Ignoring this node.", path, ioException);
 					}
 				}
 
@@ -333,6 +338,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 		retry:
 		while (!success) {
+			stateHandles.clear();
+
 			Stat stat = client.checkExists().forPath("/");
 			if (stat == null) {
 				break; // Node does not exist, done.
@@ -353,6 +360,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
 						continue retry;
+					} catch (IOException ioException) {
+						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
+							"corrupted data. Ignoring this node.", path, ioException);
 					}
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb8d051/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 5db3557..4dc4c6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -35,6 +35,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -536,6 +537,56 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
 		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
 	}
 
+	/**
+	 * Tests that the ZooKeeperStateHandleStore can handle corrupted data by ignoring the respective
+	 * ZooKeeper ZNodes.
+	 */
+	@Test
+	public void testCorruptedData() throws Exception {
+		LongStateStorage stateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZooKeeper.getClient(),
+			stateStorage,
+			Executors.directExecutor());
+
+		final Collection<Long> input = new HashSet<>();
+		input.add(1L);
+		input.add(2L);
+		input.add(3L);
+
+		for (Long aLong : input) {
+			store.add("/" + aLong, aLong);
+		}
+
+		// corrupt one of the entries
+		ZooKeeper.getClient().setData().forPath("/" + 2, new byte[2]);
+
+		List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = store.getAll();
+
+		Collection<Long> expected = new HashSet<>(input);
+		expected.remove(2L);
+
+		Collection<Long> actual = new HashSet<>(expected.size());
+
+		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+			actual.add(entry.f0.retrieveState());
+		}
+
+		assertEquals(expected, actual);
+
+		// check the same for the all sorted by name call
+		allEntries = store.getAllSortedByName();
+
+		actual.clear();
+
+		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+			actual.add(entry.f0.retrieveState());
+		}
+
+		assertEquals(expected, actual);
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Simple test helpers
 	// ---------------------------------------------------------------------------------------------