You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/19 08:58:16 UTC
[3/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore
to lock created ZNodes
http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
new file mode 100644
index 0000000..0c215cd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreTest extends TestLogger {
+
+ private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (ZOOKEEPER != null) {
+ ZOOKEEPER.shutdown();
+ }
+ }
+
+ @Before
+ public void cleanUp() throws Exception {
+ ZOOKEEPER.deleteAll();
+ }
+
+ /**
+ * Tests add operation with lock.
+ */
+ @Test
+ public void testAddAndLock() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+ ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testAdd";
+ final Long state = 1239712317L;
+
+ // Test
+ store.addAndLock(pathInZooKeeper, state);
+
+ // Verify
+ // State handle created
+ assertEquals(1, store.getAllAndLock().size());
+ assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState());
+
+ // Path created and is persistent
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+ assertNotNull(stat);
+ assertEquals(0, stat.getEphemeralOwner());
+
+ List<String> children = ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper);
+
+ // there should be one child which is the lock
+ assertEquals(1, children.size());
+
+ stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper + '/' + children.get(0));
+ assertNotNull(stat);
+
+ // check that the child is an ephemeral node
+ assertNotEquals(0, stat.getEphemeralOwner());
+
+ // Data is equal
+ @SuppressWarnings("unchecked")
+ Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).retrieveState();
+
+ assertEquals(state, actual);
+ }
+
+ /**
+ * Tests that an existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testAddAlreadyExistingPath() throws Exception {
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+ store.addAndLock("/testAddAlreadyExistingPath", 1L);
+
+ // writing to the state storage should have succeeded
+ assertEquals(1, stateHandleProvider.getStateHandles());
+
+ // the created state handle should have been cleaned up if the add operation failed
+ assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+ }
+
+ /**
+ * Tests that the created state handle is discarded if ZooKeeper create fails.
+ */
+ @Test
+ public void testAddDiscardStateHandleAfterFailure() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ CuratorFramework client = spy(ZOOKEEPER.getClient());
+ when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ client, stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
+ final Long state = 81282227L;
+
+ try {
+ // Test
+ store.addAndLock(pathInZooKeeper, state);
+ fail("Did not throw expected exception");
+ }
+ catch (Exception ignored) {
+ }
+
+ // Verify
+ // State handle created and discarded
+ assertEquals(1, stateHandleProvider.getStateHandles().size());
+ assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState());
+ assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+ }
+
+ /**
+ * Tests that a state handle is replaced.
+ */
+ @Test
+ public void testReplace() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testReplace";
+ final Long initialState = 30968470898L;
+ final Long replaceState = 88383776661L;
+
+ // Test
+ store.addAndLock(pathInZooKeeper, initialState);
+ store.replace(pathInZooKeeper, 0, replaceState);
+
+ // Verify
+ // State handles created
+ assertEquals(2, stateHandleProvider.getStateHandles().size());
+ assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
+ assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
+
+ // Path created and is persistent
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+ assertNotNull(stat);
+ assertEquals(0, stat.getEphemeralOwner());
+
+ // Data is equal
+ @SuppressWarnings("unchecked")
+ Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).retrieveState();
+
+ assertEquals(replaceState, actual);
+ }
+
+ /**
+ * Tests that a non existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testReplaceNonExistingPath() throws Exception {
+ RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+
+ store.replace("/testReplaceNonExistingPath", 0, 1L);
+ }
+
+ /**
+ * Tests that the replace state handle is discarded if ZooKeeper setData fails.
+ */
+ @Test
+ public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ CuratorFramework client = spy(ZOOKEEPER.getClient());
+ when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ client, stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
+ final Long initialState = 30968470898L;
+ final Long replaceState = 88383776661L;
+
+ // Test
+ store.addAndLock(pathInZooKeeper, initialState);
+
+ try {
+ store.replace(pathInZooKeeper, 0, replaceState);
+ fail("Did not throw expected exception");
+ }
+ catch (Exception ignored) {
+ }
+
+ // Verify
+ // State handle created and discarded
+ assertEquals(2, stateHandleProvider.getStateHandles().size());
+ assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
+ assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
+ assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+ // Initial value
+ @SuppressWarnings("unchecked")
+ Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+ ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+ ClassLoader.getSystemClassLoader())).retrieveState();
+
+ assertEquals(initialState, actual);
+ }
+
+ /**
+ * Tests get operation.
+ */
+ @Test
+ public void testGetAndExists() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testGetAndExists";
+ final Long state = 311222268470898L;
+
+ // Test
+ assertEquals(-1, store.exists(pathInZooKeeper));
+
+ store.addAndLock(pathInZooKeeper, state);
+ RetrievableStateHandle<Long> actual = store.getAndLock(pathInZooKeeper);
+
+ // Verify
+ assertEquals(state, actual.retrieveState());
+ assertTrue(store.exists(pathInZooKeeper) >= 0);
+ }
+
+ /**
+ * Tests that a non existing path throws an Exception.
+ */
+ @Test(expected = Exception.class)
+ public void testGetNonExistingPath() throws Exception {
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ store.getAndLock("/testGetNonExistingPath");
+ }
+
+ /**
+ * Tests that all added state is returned.
+ */
+ @Test
+ public void testGetAll() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testGetAll";
+
+ final Set<Long> expected = new HashSet<>();
+ expected.add(311222268470898L);
+ expected.add(132812888L);
+ expected.add(27255442L);
+ expected.add(11122233124L);
+
+ // Test
+ for (long val : expected) {
+ store.addAndLock(pathInZooKeeper + val, val);
+ }
+
+ for (Tuple2<RetrievableStateHandle<Long>, String> val : store.getAllAndLock()) {
+ assertTrue(expected.remove(val.f0.retrieveState()));
+ }
+ assertEquals(0, expected.size());
+ }
+
+ /**
+ * Tests that the state is returned sorted.
+ */
+ @Test
+ public void testGetAllSortedByName() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String basePath = "/testGetAllSortedByName";
+
+ final Long[] expected = new Long[] {
+ 311222268470898L, 132812888L, 27255442L, 11122233124L };
+
+ // Test
+ for (long val : expected) {
+ final String pathInZooKeeper = String.format("%s%016d", basePath, val);
+ store.addAndLock(pathInZooKeeper, val);
+ }
+
+ List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByNameAndLock();
+ assertEquals(expected.length, actual.size());
+
+ // bring the elements in sort order
+ Arrays.sort(expected);
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual.get(i).f0.retrieveState());
+ }
+ }
+
+ /**
+ * Tests that state handles are correctly removed.
+ */
+ @Test
+ public void testRemove() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testRemove";
+ final Long state = 27255442L;
+
+ store.addAndLock(pathInZooKeeper, state);
+
+ // Test
+ store.releaseAndTryRemove(pathInZooKeeper);
+
+ // Verify discarded
+ assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+ }
+
+ /**
+ * Tests that state handles are correctly removed with a callback.
+ */
+ @Test
+ public void testRemoveWithCallback() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testRemoveWithCallback";
+ final Long state = 27255442L;
+
+ store.addAndLock(pathInZooKeeper, state);
+
+ final CountDownLatch sync = new CountDownLatch(1);
+ ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ sync.countDown();
+ return null;
+ }
+ }).when(callback).apply(any(RetrievableStateHandle.class));
+
+ // Test
+ store.releaseAndTryRemove(pathInZooKeeper, callback);
+
+ // Verify discarded and callback called
+ assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+
+ sync.await();
+
+ verify(callback, times(1))
+ .apply(any(RetrievableStateHandle.class));
+ }
+
+ /** Tests that all state handles are correctly discarded. */
+ @Test
+ public void testReleaseAndTryRemoveAll() throws Exception {
+ // Setup
+ LongStateStorage stateHandleProvider = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+ // Config
+ final String pathInZooKeeper = "/testDiscardAll";
+
+ final Set<Long> expected = new HashSet<>();
+ expected.add(311222268470898L);
+ expected.add(132812888L);
+ expected.add(27255442L);
+ expected.add(11122233124L);
+
+ // Test
+ for (long val : expected) {
+ store.addAndLock(pathInZooKeeper + val, val);
+ }
+
+ store.releaseAndTryRemoveAll();
+
+ // Verify all discarded
+ assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+ }
+
+ /**
+ * Tests that the ZooKeeperStateHandleStore can handle corrupted data by releasing and trying to remove the
+ * respective ZooKeeper ZNodes.
+ */
+ @Test
+ public void testCorruptedData() throws Exception {
+ LongStateStorage stateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ stateStorage,
+ Executors.directExecutor());
+
+ final Collection<Long> input = new HashSet<>();
+ input.add(1L);
+ input.add(2L);
+ input.add(3L);
+
+ for (Long aLong : input) {
+ store.addAndLock("/" + aLong, aLong);
+ }
+
+ // corrupt one of the entries
+ ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]);
+
+ List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = store.getAllAndLock();
+
+ Collection<Long> expected = new HashSet<>(input);
+ expected.remove(2L);
+
+ Collection<Long> actual = new HashSet<>(expected.size());
+
+ for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+ actual.add(entry.f0.retrieveState());
+ }
+
+ assertEquals(expected, actual);
+
+ // check the same for the all sorted by name call
+ allEntries = store.getAllSortedByNameAndLock();
+
+ actual.clear();
+
+ for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+ actual.add(entry.f0.retrieveState());
+ }
+
+ assertEquals(expected, actual);
+
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 2);
+
+ // check that the corrupted node no longer exists
+ assertNull("The corrupted node should no longer exist.", stat);
+ }
+
+ /**
+ * FLINK-6612
+ *
+ * Tests that a concurrent delete operation cannot succeed if another instance holds a lock on the specified
+ * node.
+ */
+ @Test
+ public void testConcurrentDeleteOperation() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ final String statePath = "/state";
+
+ zkStore1.addAndLock(statePath, 42L);
+ RetrievableStateHandle<Long> stateHandle = zkStore2.getAndLock(statePath);
+
+ // this should not remove the referenced node because we are still holding a state handle
+ // reference via zkStore2
+ zkStore1.releaseAndTryRemove(statePath);
+
+ // sanity check
+ assertEquals(42L, (long) stateHandle.retrieveState());
+
+ Stat nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+ assertNotNull("NodeStat should not be null, otherwise the referenced node does not exist.", nodeStat);
+
+ zkStore2.releaseAndTryRemove(statePath);
+
+ nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+ assertNull("NodeState should be null, because the referenced node should no longer exist.", nodeStat);
+ }
+
+ /**
+ * FLINK-6612
+ *
+ * Tests that getAndLock removes a created lock if the RetrievableStateHandle cannot be retrieved
+ * (e.g. deserialization problem).
+ */
+ @Test
+ public void testLockCleanupWhenGetAndLockFails() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ final String path = "/state";
+
+ zkStore1.addAndLock(path, 42L);
+
+ final byte[] corruptedData = {1, 2};
+
+ // corrupt the data
+ ZOOKEEPER.getClient().setData().forPath(path, corruptedData);
+
+ try {
+ zkStore2.getAndLock(path);
+ fail("Should fail because we cannot deserialize the node's data");
+ } catch (IOException ignored) {
+ // expected to fail
+ }
+
+ // check that there is no lock node left
+ String lockNodePath = zkStore2.getLockPath(path);
+
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockNodePath);
+
+ // zkStore2 should not have created a lock node
+ assertNull("zkStore2 should not have created a lock node.", stat);
+
+ Collection<String> children = ZOOKEEPER.getClient().getChildren().forPath(path);
+
+ // there should be exactly one lock node from zkStore1
+ assertEquals(1, children.size());
+
+ zkStore1.releaseAndTryRemove(path);
+
+ stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+ assertNull("The state node should have been removed.", stat);
+ }
+
+ /**
+ * FLINK-6612
+ *
+ * Tests that lock nodes will be released if the client dies.
+ */
+ @Test
+ public void testLockCleanupWhenClientTimesOut() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
+ configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100);
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout");
+
+ try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+ CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+ client,
+ longStateStorage,
+ Executors.directExecutor());
+
+ final String path = "/state";
+
+ zkStore.addAndLock(path, 42L);
+
+ // this should delete all ephemeral nodes
+ client.close();
+
+ Stat stat = client2.checkExists().forPath(path);
+
+ // check that our state node still exists
+ assertNotNull(stat);
+
+ Collection<String> children = client2.getChildren().forPath(path);
+
+ // check that the lock node has been released
+ assertEquals(0, children.size());
+ }
+ }
+
+ /**
+ * FLINK-6612
+ *
+ * Tests that we can release a locked state handles in the ZooKeeperStateHandleStore.
+ */
+ @Test
+ public void testRelease() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ final String path = "/state";
+
+ zkStore.addAndLock(path, 42L);
+
+ final String lockPath = zkStore.getLockPath(path);
+
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockPath);
+
+ assertNotNull("Expected an existing lock", stat);
+
+ zkStore.release(path);
+
+ stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+ // release should have removed the lock child
+ assertEquals("Expected no lock nodes as children", 0, stat.getNumChildren());
+
+ zkStore.releaseAndTryRemove(path);
+
+ stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+ assertNull("State node should have been removed.",stat);
+ }
+
+ /**
+ * FLINK-6612
+ *
+ * Tests that we can release all locked state handles in the ZooKeeperStateHandleStore
+ */
+ @Test
+ public void testReleaseAll() throws Exception {
+ LongStateStorage longStateStorage = new LongStateStorage();
+
+ ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(),
+ longStateStorage,
+ Executors.directExecutor());
+
+ final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
+
+ for (String path : paths) {
+ zkStore.addAndLock(path, 42L);
+ }
+
+ for (String path : paths) {
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(zkStore.getLockPath(path));
+
+ assertNotNull("Expecte and existing lock.", stat);
+ }
+
+ zkStore.releaseAll();
+
+ for (String path : paths) {
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+ assertEquals(0, stat.getNumChildren());
+ }
+
+ zkStore.releaseAndTryRemoveAll();
+
+ Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/");
+
+ assertEquals(0, stat.getNumChildren());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Simple test helpers
+ // ---------------------------------------------------------------------------------------------
+
+ private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
+
+ private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<>();
+
+ @Override
+ public RetrievableStateHandle<Long> store(Long state) throws Exception {
+ LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state);
+ stateHandles.add(stateHandle);
+
+ return stateHandle;
+ }
+
+ List<LongRetrievableStateHandle> getStateHandles() {
+ return stateHandles;
+ }
+ }
+
+ private static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
+
+ private static final long serialVersionUID = -3555329254423838912L;
+
+ private final Long state;
+
+ private int numberOfDiscardCalls;
+
+ public LongRetrievableStateHandle(Long state) {
+ this.state = state;
+ }
+
+ @Override
+ public Long retrieveState() throws Exception {
+ return state;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ numberOfDiscardCalls++;
+ }
+
+ @Override
+ public long getStateSize() {
+ return 0;
+ }
+
+ public int getNumberOfDiscardCalls() {
+ return numberOfDiscardCalls;
+ }
+ }
+}