You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:11 UTC
[flink] 08/09: [FLINK-10329] Fail
ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 60ed037bd0c5706704428054104941469b77fad3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 17:45:36 2018 +0200
[FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed
Fail properly with an exception if we cannot remove the JobGraph in ZooKeeperSubmittedJobGraphStore#
removeJobGraph. This is necessary in order to notify callers about the unsuccessful attempt.
---
.../ZooKeeperSubmittedJobGraphStore.java | 8 +-
.../TestingRetrievableStateStorageHelper.java | 63 ++++++++++++++
.../ZooKeeperCompletedCheckpointStoreTest.java | 36 --------
.../ZooKeeperSubmittedJobGraphStoreTest.java | 96 ++++++++++++++++++++++
4 files changed, 164 insertions(+), 39 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2b935af..343b22a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -264,9 +264,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
- jobGraphsInZooKeeper.releaseAndTryRemove(path);
-
- addedJobGraphs.remove(jobId);
+ if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
+ addedJobGraphs.remove(jobId);
+ } else {
+ throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
new file mode 100644
index 0000000..92bf1af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+
+import java.io.Serializable;
+
+/**
+ * {@link RetrievableStateStorageHelper} implementation for testing purposes.
+ *
+ * @param <T> type of the element to store
+ */
+public final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+
+ @Override
+ public RetrievableStateHandle<T> store(T state) {
+ return new TestingRetrievableStateHandle<>(state);
+ }
+
+ private static final class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
+
+ private static final long serialVersionUID = 137053380713794300L;
+
+ private final T state;
+
+ private TestingRetrievableStateHandle(T state) {
+ this.state = state;
+ }
+
+ @Override
+ public T retrieveState() {
+ return state;
+ }
+
+ @Override
+ public void discardState() {
+ // no op
+ }
+
+ @Override
+ public long getStateSize() {
+ return 0;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f992d3b..a9cba88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
@@ -36,8 +34,6 @@ import org.junit.Test;
import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.Serializable;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -129,36 +125,4 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
Executors.directExecutor());
}
- private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
- @Override
- public RetrievableStateHandle<T> store(T state) {
- return new TestingRetrievableStateHandle<>(state);
- }
-
- private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
-
- private static final long serialVersionUID = 137053380713794300L;
-
- private final T state;
-
- private TestingRetrievableStateHandle(T state) {
- this.state = state;
- }
-
- @Override
- public T retrieveState() throws IOException, ClassNotFoundException {
- return state;
- }
-
- @Override
- public void discardState() throws Exception {
- // no op
- }
-
- @Override
- public long getStateSize() {
- return 0;
- }
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
new file mode 100644
index 0000000..dde3b7a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ZooKeeperSubmittedJobGraphStore}.
+ */
+public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
+
+ @Rule
+ public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
+ private Configuration configuration;
+
+ @Before
+ public void setup() {
+ configuration = new Configuration();
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+ }
+
+ /**
+ * Tests that we fail with an exception if the job cannot be removed from the
+ * ZooKeeperSubmittedJobGraphStore.
+ */
+ @Test
+ public void testJobGraphRemovalFailure() throws Exception {
+ try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
+ final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
+ final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+ submittedJobGraphStore.start(null);
+ final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+ otherSubmittedJobGraphStore.start(null);
+
+ final SubmittedJobGraph jobGraph = new SubmittedJobGraph(new JobGraph(), null);
+ submittedJobGraphStore.putJobGraph(jobGraph);
+
+ final SubmittedJobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId());
+
+ assertThat(recoveredJobGraph, is(notNullValue()));
+
+ try {
+ otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+ fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
+ } catch (Exception ignored) {
+ // expected
+ }
+
+ otherSubmittedJobGraphStore.stop();
+ }
+ }
+
+ @Nonnull
+ public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+ return new ZooKeeperSubmittedJobGraphStore(
+ client,
+ "/foobar",
+ stateStorage);
+ }
+
+}