You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/08/02 21:14:46 UTC
flink git commit: [FLINK-2436] [streaming] Make
ByteStreamStateHandles more robust
Repository: flink
Updated Branches:
refs/heads/master d73cb7369 -> 83102f0ea
[FLINK-2436] [streaming] Make ByteStreamStateHandles more robust
Closes #958
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83102f0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83102f0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83102f0e
Branch: refs/heads/master
Commit: 83102f0ea052e7f7c43c9ba6aaff0dc1c24791c9
Parents: d73cb73
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Jul 30 14:07:42 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sun Aug 2 21:14:18 2015 +0200
----------------------------------------------------------------------
.../runtime/state/ByteStreamStateHandle.java | 33 ++++-
.../state/ByteStreamStateHandleTest.java | 125 +++++++++++++++++++
.../flink/tachyon/FileStateHandleTest.java | 15 ++-
3 files changed, 165 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index 257784a..bf2dca8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -35,9 +35,14 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
private static final long serialVersionUID = -962025800339325828L;
private transient Serializable state;
+ private boolean isWritten = false;
public ByteStreamStateHandle(Serializable state) {
- this.state = state;
+ if (state != null) {
+ this.state = state;
+ } else {
+ throw new RuntimeException("State cannot be null");
+ }
}
/**
@@ -54,16 +59,25 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
public Serializable getState() throws Exception {
if (!stateFetched()) {
ObjectInputStream stream = new ObjectInputStream(getInputStream());
- state = (Serializable) stream.readObject();
- stream.close();
+ try {
+ state = (Serializable) stream.readObject();
+ } finally {
+ stream.close();
+ }
}
return state;
}
private void writeObject(ObjectOutputStream oos) throws Exception {
- ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
- stream.writeObject(state);
- stream.close();
+ if (!isWritten) {
+ ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
+ try {
+ stream.writeObject(state);
+ isWritten = true;
+ } finally {
+ stream.close();
+ }
+ }
oos.defaultWriteObject();
}
@@ -74,4 +88,11 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
public boolean stateFetched() {
return state != null;
}
+
+ /**
+ * Checks whether the state has already been written to the external store
+ */
+ public boolean isWritten() {
+ return isWritten;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
new file mode 100644
index 0000000..a7378b9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+public class ByteStreamStateHandleTest {
+
+ @Test
+ public void testHandle() throws Exception {
+ MockHandle handle;
+
+ try {
+ handle = new MockHandle(null);
+ fail();
+ } catch (RuntimeException e) {
+ // expected behaviour
+ }
+
+ handle = new MockHandle(1);
+
+ assertEquals(1, handle.getState());
+ assertTrue(handle.stateFetched());
+ assertFalse(handle.isWritten());
+ assertFalse(handle.discarded);
+
+ MockHandle handleDs = serializeDeserialize(handle);
+
+ assertEquals(1, handle.getState());
+ assertTrue(handle.stateFetched());
+ assertTrue(handle.isWritten());
+ assertTrue(handle.generatedOutput);
+ assertFalse(handle.discarded);
+
+ assertFalse(handleDs.stateFetched());
+ assertTrue(handleDs.isWritten());
+ assertFalse(handleDs.generatedOutput);
+ assertFalse(handle.discarded);
+
+ try {
+ handleDs.getState();
+ fail();
+ } catch (UnsupportedOperationException e) {
+ // good
+ }
+
+ MockHandle handleDs2 = serializeDeserialize(handleDs);
+
+ assertFalse(handleDs2.stateFetched());
+ assertTrue(handleDs2.isWritten());
+ assertFalse(handleDs.generatedOutput);
+ assertFalse(handleDs2.generatedOutput);
+ assertFalse(handleDs2.discarded);
+
+ handleDs2.discardState();
+ assertTrue(handleDs2.discarded);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
+ ClassNotFoundException {
+ byte[] serialized = InstantiationUtil.serializeObject(handle);
+ return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
+ .getContextClassLoader());
+ }
+
+ private static class MockHandle extends ByteStreamStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ public MockHandle(Serializable state) {
+ super(state);
+ }
+
+ boolean discarded = false;
+ transient boolean generatedOutput = false;
+
+ @Override
+ public void discardState() throws Exception {
+ discarded = true;
+ }
+
+ @Override
+ protected OutputStream getOutputStream() throws Exception {
+ generatedOutput = true;
+ return new ByteArrayOutputStream();
+ }
+
+ @Override
+ protected InputStream getInputStream() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/83102f0e/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
index 82b5d35..2873c78 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.tachyon;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@@ -90,13 +91,23 @@ public class FileStateHandleTest {
+ hdPath);
FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+
+ try {
+ handleProvider.createStateHandle(null);
+ fail();
+ } catch (RuntimeException e) {
+ // good
+ }
assertTrue(handle.stateFetched());
+ assertFalse(handle.isWritten());
// Serialize the handle so it writes the value to hdfs
SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
handle);
-
+
+ assertTrue(handle.isWritten());
+
// Deserialize the handle and verify that the state is not fetched yet
FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
.deserializeValue(Thread.currentThread().getContextClassLoader());
@@ -107,7 +118,7 @@ public class FileStateHandleTest {
// Test whether discard removes the checkpoint file properly
assertTrue(hdfs.listFiles(hdPath, true).hasNext());
- handle.discardState();
+ deserializedHandle.discardState();
assertFalse(hdfs.listFiles(hdPath, true).hasNext());
}