You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:22 UTC
[31/47] flink git commit: [FLINK-2354] [runtime] Replace old
StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
new file mode 100644
index 0000000..481fb98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class FileStateBackendTest {
+
+ @Test
+ public void testSetupAndSerialization() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ final String backendDir = localFileUri(tempDir);
+ FsStateBackend originalBackend = new FsStateBackend(backendDir);
+
+ assertFalse(originalBackend.isInitialized());
+ assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
+ assertNull(originalBackend.getCheckpointDirectory());
+
+ // serialize / copy the backend
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+ assertFalse(backend.isInitialized());
+ assertEquals(new URI(backendDir), backend.getBasePath().toUri());
+ assertNull(backend.getCheckpointDirectory());
+
+ // no file operations should be possible right now
+ try {
+ backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // supreme!
+ }
+
+ backend.initializeForJob(new JobID());
+ assertNotNull(backend.getCheckpointDirectory());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+ assertTrue(checkpointDir.exists());
+ assertTrue(isDirectoryEmpty(checkpointDir));
+
+ backend.disposeAllStateForCurrentJob();
+ assertNull(backend.getCheckpointDirectory());
+
+ assertTrue(isDirectoryEmpty(tempDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testSerializableState() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ String state1 = "dummy state";
+ String state2 = "row row row your boat";
+ Integer state3 = 42;
+
+ StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+ StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+ StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+ handle1.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+ handle2.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+ handle3.discardState();
+
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testStateOutputStream() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ byte[] state1 = new byte[1274673];
+ byte[] state2 = new byte[1];
+ byte[] state3 = new byte[0];
+ byte[] state4 = new byte[177];
+
+ Random rnd = new Random();
+ rnd.nextBytes(state1);
+ rnd.nextBytes(state2);
+ rnd.nextBytes(state3);
+ rnd.nextBytes(state4);
+
+ long checkpointId = 97231523452L;
+
+ FsStateBackend.FsCheckpointStateOutputStream stream1 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream2 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream3 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+
+ stream1.write(state1);
+ stream2.write(state2);
+ stream3.write(state3);
+
+ FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+ FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+ FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+
+ // use with try-with-resources
+ StreamStateHandle handle4;
+ try (StateBackend.CheckpointStateOutputStream stream4 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+ stream4.write(state4);
+ handle4 = stream4.closeAndGetHandle();
+ }
+
+ // close before accessing handle
+ StateBackend.CheckpointStateOutputStream stream5 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ stream5.write(state4);
+ stream5.close();
+ try {
+ stream5.closeAndGetHandle();
+ fail();
+ } catch (IOException e) {
+ // uh-huh
+ }
+
+ validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+ handle1.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle1.getFilePath());
+
+ validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+ handle2.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle2.getFilePath());
+
+ validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+ handle3.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle3.getFilePath());
+
+ validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+ handle4.discardState();
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testKeyValueState() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ KvState<Integer, String, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ assertEquals(0, kv.size());
+
+ // some modifications to the state
+ kv.setCurrentKey(1);
+ assertNull(kv.value());
+ kv.update("1");
+ assertEquals(1, kv.size());
+ kv.setCurrentKey(2);
+ assertNull(kv.value());
+ kv.update("2");
+ assertEquals(2, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("1", kv.value());
+ assertEquals(2, kv.size());
+
+ // draw a snapshot
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+ // make some more modifications
+ kv.setCurrentKey(1);
+ kv.update("u1");
+ kv.setCurrentKey(2);
+ kv.update("u2");
+ kv.setCurrentKey(3);
+ kv.update("u3");
+
+ // draw another snapshot
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
+ kv.shapshot(682375462379L, System.currentTimeMillis());
+
+ // validate the original state
+ assertEquals(3, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("u1", kv.value());
+ kv.setCurrentKey(2);
+ assertEquals("u2", kv.value());
+ kv.setCurrentKey(3);
+ assertEquals("u3", kv.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(2, restored1.size());
+ restored1.setCurrentKey(1);
+ assertEquals("1", restored1.value());
+ restored1.setCurrentKey(2);
+ assertEquals("2", restored1.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(3, restored2.size());
+ restored2.setCurrentKey(1);
+ assertEquals("u1", restored2.value());
+ restored2.setCurrentKey(2);
+ assertEquals("u2", restored2.value());
+ restored2.setCurrentKey(3);
+ assertEquals("u3", restored2.value());
+
+ snapshot1.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+
+ snapshot2.discardState();
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testRestoreWithWrongSerializers() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ KvState<Integer, String, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ kv.setCurrentKey(1);
+ kv.update("1");
+ kv.setCurrentKey(2);
+ kv.update("2");
+
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Integer> fakeIntSerializer =
+ (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<String> fakeStringSerializer =
+ (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ StringSerializer.INSTANCE, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, IntSerializer.INSTANCE,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ snapshot.discardState();
+
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static void ensureLocalFileDeleted(Path path) {
+ URI uri = path.toUri();
+ if ("file".equals(uri.getScheme())) {
+ File file = new File(uri.getPath());
+ assertFalse("file not properly deleted", file.exists());
+ }
+ else {
+ throw new IllegalArgumentException("not a local path");
+ }
+ }
+
+ private static void deleteDirectorySilently(File dir) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ }
+ catch (IOException ignored) {}
+ }
+
+ private static boolean isDirectoryEmpty(File directory) {
+ String[] nested = directory.list();
+ return nested == null || nested.length == 0;
+ }
+
+ private static String localFileUri(File path) {
+ return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+ }
+
+ private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+ byte[] holder = new byte[data.length];
+ assertEquals("not enough data", holder.length, is.read(holder));
+ assertEquals("too much data", -1, is.read());
+ assertArrayEquals("wrong data", data, holder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
new file mode 100644
index 0000000..5f95b33
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
+ */
+public class MemoryStateBackendTest {
+
+ @Test
+ public void testSerializableState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
+ assertNotNull(handle);
+
+ HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
+ assertEquals(state, restored);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOversizedState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend(10);
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ try {
+ backend.checkpointStateSerializable(state, 12, 459);
+ fail("this should cause an exception");
+ }
+ catch (IOException e) {
+ // now darling, isn't that exactly what we wanted?
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStateStream() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+ ObjectOutputStream oos = new ObjectOutputStream(os);
+ oos.writeObject(state);
+ oos.flush();
+ StreamStateHandle handle = os.closeAndGetHandle();
+
+ assertNotNull(handle);
+
+ ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
+ assertEquals(state, ois.readObject());
+ assertTrue(ois.available() <= 0);
+ ois.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOversizedStateStream() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend(10);
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+ ObjectOutputStream oos = new ObjectOutputStream(os);
+
+ try {
+ oos.writeObject(state);
+ oos.flush();
+ os.closeAndGetHandle();
+ fail("this should cause an exception");
+ }
+ catch (IOException e) {
+ // oh boy! what an exception!
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testKeyValueState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ KvState<Integer, String, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ assertEquals(0, kv.size());
+
+ // some modifications to the state
+ kv.setCurrentKey(1);
+ assertNull(kv.value());
+ kv.update("1");
+ assertEquals(1, kv.size());
+ kv.setCurrentKey(2);
+ assertNull(kv.value());
+ kv.update("2");
+ assertEquals(2, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("1", kv.value());
+ assertEquals(2, kv.size());
+
+ // draw a snapshot
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+ // make some more modifications
+ kv.setCurrentKey(1);
+ kv.update("u1");
+ kv.setCurrentKey(2);
+ kv.update("u2");
+ kv.setCurrentKey(3);
+ kv.update("u3");
+
+ // draw another snapshot
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
+ kv.shapshot(682375462379L, System.currentTimeMillis());
+
+ // validate the original state
+ assertEquals(3, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("u1", kv.value());
+ kv.setCurrentKey(2);
+ assertEquals("u2", kv.value());
+ kv.setCurrentKey(3);
+ assertEquals("u3", kv.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(2, restored1.size());
+ restored1.setCurrentKey(1);
+ assertEquals("1", restored1.value());
+ restored1.setCurrentKey(2);
+ assertEquals("2", restored1.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(3, restored2.size());
+ restored2.setCurrentKey(1);
+ assertEquals("u1", restored2.value());
+ restored2.setCurrentKey(2);
+ assertEquals("u2", restored2.value());
+ restored2.setCurrentKey(3);
+ assertEquals("u3", restored2.value());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestoreWithWrongSerializers() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+ KvState<Integer, String, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ kv.setCurrentKey(1);
+ kv.update("1");
+ kv.setCurrentKey(2);
+ kv.update("2");
+
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Integer> fakeIntSerializer =
+ (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<String> fakeStringSerializer =
+ (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ StringSerializer.INSTANCE, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, IntSerializer.INSTANCE,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index d2e5b6a..a65ec01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -79,7 +80,7 @@ public class ZooKeeperTestUtils {
// File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints");
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index f0130ec..788f70d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.CreateMode;
@@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/
@Test
public void testAdd() throws Exception {
- // Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
-
- ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
- ZooKeeper.getClient(), stateHandleProvider);
+ LongStateStorage longStateStorage = new LongStateStorage();
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+ ZooKeeper.getClient(), longStateStorage);
// Config
final String pathInZooKeeper = "/testAdd";
@@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Verify
// State handle created
- assertEquals(1, stateHandleProvider.getStateHandles().size());
- assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null));
+ assertEquals(1, store.getAll().size());
+ assertEquals(state, store.get(pathInZooKeeper).getState(null));
// Path created and is persistent
Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/
@Test
public void testAddWithCreateMode() throws Exception {
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
-
- ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
- ZooKeeper.getClient(), stateHandleProvider);
+ LongStateStorage longStateStorage = new LongStateStorage();
+ ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+ ZooKeeper.getClient(), longStateStorage);
// Config
Long state = 3457347234L;
@@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Verify
// State handle created
- assertEquals(i + 1, stateHandleProvider.getStateHandles().size());
- assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null));
+ assertEquals(i + 1, store.getAll().size());
+ assertEquals(state, longStateStorage.getStateHandles().get(i).getState(null));
// Path created
Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/
@Test(expected = Exception.class)
public void testAddAlreadyExistingPath() throws Exception {
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testAddDiscardStateHandleAfterFailure() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
CuratorFramework client = spy(ZooKeeper.getClient());
when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
@@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testReplace() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/
@Test(expected = Exception.class)
public void testReplaceNonExistingPath() throws Exception {
- StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider();
+ StateStorageHelper<Long> stateStorage = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
- ZooKeeper.getClient(), stateHandleProvider);
+ ZooKeeper.getClient(), stateStorage);
store.replace("/testReplaceNonExistingPath", 0, 1L);
}
@@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
CuratorFramework client = spy(ZooKeeper.getClient());
when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
@@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testGetAndExists() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
*/
@Test(expected = Exception.class)
public void testGetNonExistingPath() throws Exception {
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testGetAll() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testGetAllSortedByName() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testRemove() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testRemoveWithCallback() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testRemoveAndDiscardState() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
@Test
public void testRemoveAndDiscardAllState() throws Exception {
// Setup
- LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider();
+ LongStateStorage stateHandleProvider = new LongStateStorage();
ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
ZooKeeper.getClient(), stateHandleProvider);
@@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
// Simple test helpers
// ---------------------------------------------------------------------------------------------
- private static class LongStateHandleProvider implements StateHandleProvider<Long> {
-
- private static final long serialVersionUID = 4572084854499402276L;
+ private static class LongStateStorage implements StateStorageHelper<Long> {
private final List<LongStateHandle> stateHandles = new ArrayList<>();
@Override
- public StateHandle<Long> createStateHandle(Long state) {
+ public StateHandle<Long> store(Long state) throws Exception {
LongStateHandle stateHandle = new LongStateHandle(state);
stateHandles.add(stateHandle);
return stateHandle;
}
- public List<LongStateHandle> getStateHandles() {
+ List<LongStateHandle> getStateHandles() {
return stateHandles;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 1ca02aa..f77ed07 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
# -----------------------------------------------------------------------------
# Console (use 'console')
@@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+log4j.logger.org.apache.flink.runtime.blob=DEBUG
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 8b7fb1c..4e4acd2 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 26e1c9e..98506e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.SplittableIterator;
@@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment {
* the key/value state, and for checkpointed functions (implementing the interface
* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
*
- * <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example
+ * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
* maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
* but can checkpoint only small states (some counters).
*
- * <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+ * <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
* stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
* file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
* failures of individual nodes and that streaming program can be executed highly available and strongly
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
index 3817ede..3ac63af 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
@@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
import java.util.ArrayDeque;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 76be598..11bf84f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 0652406..be020d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9e60e9a..078679d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.KvState;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index a991fd3..17bd08d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
deleted file mode 100644
index b974674..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for key/value state implementations that are backed by a regular heap hash map. The
- * concrete implementations define how the state is checkpointed.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- * @param <Backend> The type of the backend that snapshots this key/value state.
- */
-public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> {
-
- /** Map containing the actual key/value pairs */
- private final HashMap<K, V> state;
-
- /** The serializer for the keys */
- private final TypeSerializer<K> keySerializer;
-
- /** The serializer for the values */
- private final TypeSerializer<V> valueSerializer;
-
- /** The value that is returned when no other value has been associated with a key, yet */
- private final V defaultValue;
-
- /** The current key, which the next value methods will refer to */
- private K currentKey;
-
- /**
- * Creates a new empty key/value state.
- *
- * @param keySerializer The serializer for the keys.
- * @param valueSerializer The serializer for the values.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- */
- protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer,
- V defaultValue) {
- this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>());
- }
-
- /**
- * Creates a new key/value state for the given hash map of key/value pairs.
- *
- * @param keySerializer The serializer for the keys.
- * @param valueSerializer The serializer for the values.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- * @param state The state map to use in this kev/value state. May contain initial state.
- */
- protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer,
- V defaultValue,
- HashMap<K, V> state) {
- this.state = requireNonNull(state);
- this.keySerializer = requireNonNull(keySerializer);
- this.valueSerializer = requireNonNull(valueSerializer);
- this.defaultValue = defaultValue;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public V value() {
- V value = state.get(currentKey);
- return value != null ? value : defaultValue;
- }
-
- @Override
- public void update(V value) {
- if (value != null) {
- state.put(currentKey, value);
- }
- else {
- state.remove(currentKey);
- }
- }
-
- @Override
- public void setCurrentKey(K currentKey) {
- this.currentKey = currentKey;
- }
-
- @Override
- public int size() {
- return state.size();
- }
-
- @Override
- public void dispose() {
- state.clear();
- }
-
- /**
- * Gets the serializer for the keys.
- * @return The serializer for the keys.
- */
- public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
- }
-
- /**
- * Gets the serializer for the values.
- * @return The serializer for the values.
- */
- public TypeSerializer<V> getValueSerializer() {
- return valueSerializer;
- }
-
- // ------------------------------------------------------------------------
- // checkpointing utilities
- // ------------------------------------------------------------------------
-
- protected void writeStateToOutputView(final DataOutputView out) throws IOException {
- for (Map.Entry<K, V> entry : state.entrySet()) {
- keySerializer.serialize(entry.getKey(), out);
- valueSerializer.serialize(entry.getValue(), out);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
deleted file mode 100644
index 9c628f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.state.OperatorState;
-
-/**
- * Key/Value state implementation for user-defined state. The state is backed by a state
- * backend, which typically follows one of the following patterns: Either the state is stored
- * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
- * state backend into some store (during checkpoints), or the key/value state is in fact backed
- * by an external key/value store as the state backend, and checkpoints merely record the
- * metadata of what is considered part of the checkpoint.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
-
- /**
- * Sets the current key, which will be used to retrieve values for the next calls to
- * {@link #value()} and {@link #update(Object)}.
- *
- * @param key The key.
- */
- void setCurrentKey(K key);
-
- /**
- * Creates a snapshot of this state.
- *
- * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
- * @param timestamp The timestamp of the checkpoint.
- * @return A snapshot handle for this key/value state.
- *
- * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
- * can react to failed snapshots.
- */
- KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception;
-
- /**
- * Gets the number of key/value pairs currently stored in the state. Note that is a key
- * has been associated with "null", the key is removed from the state an will not
- * be counted here.
- *
- * @return The number of key/value pairs currently stored in the state.
- */
- int size();
-
- /**
- * Disposes the key/value state, releasing all occupied resources.
- */
- void dispose();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
deleted file mode 100644
index 6aa7a1e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
- * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
- * on the actual implementation. This snapshot defines merely how to restore the state and
- * how to discard the state.
- *
- * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
- *
- * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
- * a file and this snapshot object contains a pointer to that file.
- *
- * @param <K> The type of the key
- * @param <V> The type of the value
- * @param <Backend> The type of the backend that can restore the state from this snapshot.
- */
-public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
-
- /**
- * Loads the key/value state back from this snapshot.
- *
- *
- * @param stateBackend The state backend that created this snapshot and can restore the key/value state
- * from this snapshot.
- * @param keySerializer The serializer for the keys.
- * @param valueSerializer The serializer for the values.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- * @param classLoader The class loader for user-defined types.
- *
- * @return An instance of the key/value state loaded from this snapshot.
- *
- * @throws Exception Exceptions can occur during the state loading and are forwarded.
- */
- KvState<K, V, Backend> restoreState(
- Backend stateBackend,
- TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer,
- V defaultValue,
- ClassLoader classLoader) throws Exception;
-
-
- /**
- * Discards the state snapshot, removing any resources occupied by it.
- *
- * @throws Exception Exceptions occurring during the state disposal should be forwarded.
- */
- void discardState() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
deleted file mode 100644
index 2bbb4e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class represents serialized checkpoint data for a collection of elements.
- */
-public class SerializedCheckpointData implements java.io.Serializable {
-
- private static final long serialVersionUID = -8783744683896503488L;
-
- /** ID of the checkpoint for which the IDs are stored */
- private final long checkpointId;
-
- /** The serialized elements */
- private final byte[] serializedData;
-
- /** The number of elements in the checkpoint */
- private final int numIds;
-
- /**
- * Creates a SerializedCheckpointData object for the given serialized data.
- *
- * @param checkpointId The checkpointId of the checkpoint.
- * @param serializedData The serialized IDs in this checkpoint.
- * @param numIds The number of IDs in the checkpoint.
- */
- public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
- this.checkpointId = checkpointId;
- this.serializedData = serializedData;
- this.numIds = numIds;
- }
-
- /**
- * Gets the checkpointId of the checkpoint.
- * @return The checkpointId of the checkpoint.
- */
- public long getCheckpointId() {
- return checkpointId;
- }
-
- /**
- * Gets the binary data for the serialized elements.
- * @return The binary data for the serialized elements.
- */
- public byte[] getSerializedData() {
- return serializedData;
- }
-
- /**
- * Gets the number of IDs in the checkpoint.
- * @return The number of IDs in the checkpoint.
- */
- public int getNumIds() {
- return numIds;
- }
-
- // ------------------------------------------------------------------------
- // Serialize to Checkpoint
- // ------------------------------------------------------------------------
-
- /**
- * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
- *
- * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
- * @param serializer The serializer to serialize the IDs.
- * @param <T> The type of the ID.
- * @return An array of serializable SerializedCheckpointData, one per entry in the
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
- TypeSerializer<T> serializer) throws IOException {
- return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
- }
-
- /**
- * Converts a list of checkpoints into an array of SerializedCheckpointData.
- *
- * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
- * @param serializer The serializer to serialize the IDs.
- * @param outputBuffer The reusable serialization buffer.
- * @param <T> The type of the ID.
- * @return An array of serializable SerializedCheckpointData, one per entry in the
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
- TypeSerializer<T> serializer,
- DataOutputSerializer outputBuffer) throws IOException {
- SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
-
- int pos = 0;
- for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
- outputBuffer.clear();
- List<T> checkpointIds = checkpoint.f1;
-
- for (T id : checkpointIds) {
- serializer.serialize(id, outputBuffer);
- }
-
- serializedCheckpoints[pos++] = new SerializedCheckpointData(
- checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
- }
-
- return serializedCheckpoints;
- }
-
- // ------------------------------------------------------------------------
- // De-Serialize from Checkpoint
- // ------------------------------------------------------------------------
-
- /**
- * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
- *
- * @param data The data to be deserialized.
- * @param serializer The serializer used to deserialize the data.
- * @param <T> The type of the elements.
- * @return An ArrayDeque of element checkpoints.
- *
- * @throws IOException Thrown, if the serialization fails.
- */
- public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
- SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
- {
- ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
- DataInputDeserializer deser = null;
-
- for (SerializedCheckpointData checkpoint : data) {
- byte[] serializedData = checkpoint.getSerializedData();
- if (deser == null) {
- deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
- }
- else {
- deser.setBuffer(serializedData, 0, serializedData.length);
- }
-
- final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
- final int numIds = checkpoint.getNumIds();
-
- for (int i = 0; i < numIds; i++) {
- ids.add(serializer.deserialize(deser));
- }
-
- deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
- }
-
- return deque;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
deleted file mode 100644
index f4391ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A state backend defines how state is stored and snapshotted during checkpoints.
- *
- * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
- * type of backend when creating state backed by this backend.
- */
-public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
-
- private static final long serialVersionUID = 4620413814639220247L;
-
- // ------------------------------------------------------------------------
- // initialization and cleanup
- // ------------------------------------------------------------------------
-
- /**
- * This method is called by the task upon deployment to initialize the state backend for
- * data for a specific job.
- *
- * @param job The ID of the job for which the state backend instance checkpoints data.
- * @throws Exception Overwritten versions of this method may throw exceptions, in which
- * case the job that uses the state backend is considered failed during
- * deployment.
- */
- public abstract void initializeForJob(JobID job) throws Exception;
-
- /**
- * Disposes all state associated with the current job.
- *
- * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
- */
- public abstract void disposeAllStateForCurrentJob() throws Exception;
-
- /**
- * Closes the state backend, releasing all internal resources, but does not delete any persistent
- * checkpoint data.
- *
- * @throws Exception Exceptions can be forwarded and will be logged by the system
- */
- public abstract void close() throws Exception;
-
- // ------------------------------------------------------------------------
- // key/value state
- // ------------------------------------------------------------------------
-
- /**
- * Creates a key/value state backed by this state backend.
- *
- * @param keySerializer The serializer for the key.
- * @param valueSerializer The serializer for the value.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- *
- * @return A new key/value state backed by this backend.
- *
- * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
- */
- public abstract <K, V> KvState<K, V, Backend> createKvState(
- TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
- V defaultValue) throws Exception;
-
-
- // ------------------------------------------------------------------------
- // storing state for a checkpoint
- // ------------------------------------------------------------------------
-
- /**
- * Creates an output stream that writes into the state of the given checkpoint. When the stream
- * is closes, it returns a state handle that can retrieve the state back.
- *
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @return An output stream that writes state for the given checkpoint.
- *
- * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
- */
- public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
- long checkpointID, long timestamp) throws Exception;
-
- /**
- * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
- * When the stream is closes, it returns a state handle that can retrieve the state back.
- *
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @return An DataOutputView stream that writes state for the given checkpoint.
- *
- * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
- */
- public CheckpointStateOutputView createCheckpointStateOutputView(
- long checkpointID, long timestamp) throws Exception {
- return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
- }
-
- /**
- * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
- *
- * @param state The state to be checkpointed.
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @param <S> The type of the state.
- *
- * @return A state handle that can retrieve the checkpoined state.
- *
- * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
- */
- public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception;
-
-
- // ------------------------------------------------------------------------
- // Checkpoint state output stream
- // ------------------------------------------------------------------------
-
- /**
- * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
- */
- public static abstract class CheckpointStateOutputStream extends OutputStream {
-
- /**
- * Closes the stream and gets a state handle that can create an input stream
- * producing the data written to this stream.
- *
- * @return A state handle that can create an input stream producing the data written to this stream.
- * @throws IOException Thrown, if the stream cannot be closed.
- */
- public abstract StreamStateHandle closeAndGetHandle() throws IOException;
- }
-
- /**
- * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
- */
- public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-
- private final CheckpointStateOutputStream out;
-
- public CheckpointStateOutputView(CheckpointStateOutputStream out) {
- super(out);
- this.out = out;
- }
-
- /**
- * Closes the stream and gets a state handle that can create a DataInputView.
- * producing the data written to this stream.
- *
- * @return A state handle that can create an input stream producing the data written to this stream.
- * @throws IOException Thrown, if the stream cannot be closed.
- */
- public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
- return new DataInputViewHandle(out.closeAndGetHandle());
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
- }
-
- /**
- * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
- */
- private static final class DataInputViewHandle implements StateHandle<DataInputView> {
-
- private static final long serialVersionUID = 2891559813513532079L;
-
- private final StreamStateHandle stream;
-
- private DataInputViewHandle(StreamStateHandle stream) {
- this.stream = stream;
- }
-
- @Override
- public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
- return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
- }
-
- @Override
- public void discardState() throws Exception {
- stream.discardState();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
deleted file mode 100644
index ad87eae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * A factory to create a specific state backend. The state backend creation gets a Configuration
- * object that can be used to read further config values.
- *
- * @param <T> The type of the state backend created.
- */
-public interface StateBackendFactory<T extends StateBackend<T>> {
-
- /**
- * Creates the state backend, optionally using the given configuration.
- *
- * @param config The Flink configuration (loaded by the TaskManager).
- * @return The created state backend.
- *
- * @throws Exception Exceptions during instantiation can be forwarded.
- */
- StateBackend<T> createFromConfig(Configuration config) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
deleted file mode 100644
index 0fa5952..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that produces an input stream when resolved.
- */
-public interface StreamStateHandle extends StateHandle<InputStream> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
deleted file mode 100644
index c4a376e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileState implements java.io.Serializable {
-
- private static final long serialVersionUID = 350284443258002355L;
-
- /** The path to the file in the filesystem, fully describing the file system */
- private final Path filePath;
-
- /** Cached file system handle */
- private transient FileSystem fs;
-
- /**
- * Creates a new file state for the given file path.
- *
- * @param filePath The path to the file that stores the state.
- */
- protected AbstractFileState(Path filePath) {
- this.filePath = filePath;
- }
-
- /**
- * Gets the path where this handle's state is stored.
- * @return The path where this handle's state is stored.
- */
- public Path getFilePath() {
- return filePath;
- }
-
- /**
- * Discard the state by deleting the file that stores the state. If the parent directory
- * of the state is empty after deleting the state file, it is also deleted.
- *
- * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
- */
- public void discardState() throws Exception {
- getFileSystem().delete(filePath, false);
-
- // send a call to delete the directory containing the file. this will
- // fail (and be ignored) when some files still exist
- try {
- getFileSystem().delete(filePath.getParent(), false);
- } catch (IOException ignored) {}
- }
-
- /**
- * Gets the file system that stores the file state.
- * @return The file system that stores the file state.
- * @throws IOException Thrown if the file system cannot be accessed.
- */
- protected FileSystem getFileSystem() throws IOException {
- if (fs == null) {
- fs = FileSystem.get(filePath.toUri());
- }
- return fs;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
deleted file mode 100644
index 9bf5ec1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.ObjectInputStream;
-
-/**
- * A state handle that points to state stored in a file via Java Serialization.
- *
- * @param <T> The type of state pointed to by the state handle.
- */
-public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> {
-
- private static final long serialVersionUID = -657631394290213622L;
-
- /**
- * Creates a new FileSerializableStateHandle pointing to state at the given file path.
- *
- * @param filePath The path to the file containing the checkpointed state.
- */
- public FileSerializableStateHandle(Path filePath) {
- super(filePath);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public T getState(ClassLoader classLoader) throws Exception {
- FSDataInputStream inStream = getFileSystem().open(getFilePath());
- ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
- return (T) ois.readObject();
- }
-}