You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:11 UTC

[flink] 08/09: [FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60ed037bd0c5706704428054104941469b77fad3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 17:45:36 2018 +0200

    [FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed
    
    Fail properly with an exception if we cannot remove the JobGraph in ZooKeeperSubmittedJobGraphStore#
    removeJobGraph. This is necessary in order to notify callers about the unsuccessful attempt.
---
 .../ZooKeeperSubmittedJobGraphStore.java           |  8 +-
 .../TestingRetrievableStateStorageHelper.java      | 63 ++++++++++++++
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 36 --------
 .../ZooKeeperSubmittedJobGraphStoreTest.java       | 96 ++++++++++++++++++++++
 4 files changed, 164 insertions(+), 39 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2b935af..343b22a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -264,9 +264,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
-				jobGraphsInZooKeeper.releaseAndTryRemove(path);
-
-				addedJobGraphs.remove(jobId);
+				if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
+					addedJobGraphs.remove(jobId);
+				} else {
+					throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
+				}
 			}
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
new file mode 100644
index 0000000..92bf1af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+
+import java.io.Serializable;
+
+/**
+ * {@link RetrievableStateStorageHelper} implementation for testing purposes.
+ *
+ * @param <T> type of the element to store
+ */
+public final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+
+	@Override
+	public RetrievableStateHandle<T> store(T state) {
+		return new TestingRetrievableStateHandle<>(state);
+	}
+
+	private static final class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
+
+		private static final long serialVersionUID = 137053380713794300L;
+
+		private final T state;
+
+		private TestingRetrievableStateHandle(T state) {
+			this.state = state;
+		}
+
+		@Override
+		public T retrieveState() {
+			return state;
+		}
+
+		@Override
+		public void discardState() {
+			// no op
+		}
+
+		@Override
+		public long getStateSize() {
+			return 0;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f992d3b..a9cba88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
@@ -36,8 +34,6 @@ import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -129,36 +125,4 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			Executors.directExecutor());
 	}
 
-	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
-		@Override
-		public RetrievableStateHandle<T> store(T state) {
-			return new TestingRetrievableStateHandle<>(state);
-		}
-
-		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
-
-			private static final long serialVersionUID = 137053380713794300L;
-
-			private final T state;
-
-			private TestingRetrievableStateHandle(T state) {
-				this.state = state;
-			}
-
-			@Override
-			public T retrieveState() throws IOException, ClassNotFoundException {
-				return state;
-			}
-
-			@Override
-			public void discardState() throws Exception {
-				// no op
-			}
-
-			@Override
-			public long getStateSize() {
-				return 0;
-			}
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
new file mode 100644
index 0000000..dde3b7a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ZooKeeperSubmittedJobGraphStore}.
+ */
+public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
+
+	@Rule
+	public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
+	private Configuration configuration;
+
+	@Before
+	public void setup() {
+		configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+	}
+
+	/**
+	 * Tests that we fail with an exception if the job cannot be removed from the
+	 * ZooKeeperSubmittedJobGraphStore.
+	 */
+	@Test
+	public void testJobGraphRemovalFailure() throws Exception {
+		try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
+			final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+			submittedJobGraphStore.start(null);
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+			otherSubmittedJobGraphStore.start(null);
+
+			final SubmittedJobGraph jobGraph = new SubmittedJobGraph(new JobGraph(), null);
+			submittedJobGraphStore.putJobGraph(jobGraph);
+
+			final SubmittedJobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId());
+
+			assertThat(recoveredJobGraph, is(notNullValue()));
+
+			try {
+				otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+				fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
+			} catch (Exception ignored) {
+				// expected
+			}
+
+			otherSubmittedJobGraphStore.stop();
+		}
+	}
+
+	@Nonnull
+	public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+		return new ZooKeeperSubmittedJobGraphStore(
+			client,
+			"/foobar",
+			stateStorage);
+	}
+
+}