You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:21 UTC
[30/47] flink git commit: [FLINK-2354] [runtime] Replace old
StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
deleted file mode 100644
index 79512d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.InputStream;
-
-/**
- * A state handle that points to state in a file system, accessible as an input stream.
- */
-public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle {
-
- private static final long serialVersionUID = -6826990484549987311L;
-
- /**
- * Creates a new FileStreamStateHandle pointing to state at the given file path.
- *
- * @param filePath The path to the file containing the checkpointed state.
- */
- public FileStreamStateHandle(Path filePath) {
- super(filePath);
- }
-
- @Override
- public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
- return getFileSystem().open(getFilePath());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
deleted file mode 100644
index 107a3be..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.io.DataOutputStream;
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into files.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> {
-
- /** The file system state backend backing snapshots of this state */
- private final FsStateBackend backend;
-
- /**
- * Creates a new and empty key/value state.
- *
- * @param keySerializer The serializer for the key.
- * @param valueSerializer The serializer for the value.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- * @param backend The file system state backend backing snapshots of this state
- */
- public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
- V defaultValue, FsStateBackend backend) {
- super(keySerializer, valueSerializer, defaultValue);
- this.backend = backend;
- }
-
- /**
- * Creates a new key/value state with the given state contents.
- * This method is used to re-create key/value state with existing data, for example from
- * a snapshot.
- *
- * @param keySerializer The serializer for the key.
- * @param valueSerializer The serializer for the value.
- * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
- * @param state The map of key/value pairs to initialize the state with.
- * @param backend The file system state backend backing snapshots of this state
- */
- public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
- V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
- super(keySerializer, valueSerializer, defaultValue, state);
- this.backend = backend;
- }
-
-
- @Override
- public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
- // first, create an output stream to write to
- try (FsStateBackend.FsCheckpointStateOutputStream out =
- backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
- // serialize the state to the output stream
- OutputViewDataOutputStreamWrapper outView =
- new OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
- outView.writeInt(size());
- writeStateToOutputView(outView);
- outView.flush();
-
- // create a handle to the state
- return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
deleted file mode 100644
index c7117f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.io.DataInputStream;
-import java.util.HashMap;
-
-/**
- * A snapshot of a heap key/value state stored in a file.
- *
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> {
-
- private static final long serialVersionUID = 1L;
-
- /** Name of the key serializer class */
- private final String keySerializerClassName;
-
- /** Name of the value serializer class */
- private final String valueSerializerClassName;
-
- /**
- * Creates a new state snapshot with data in the file system.
- *
- * @param keySerializer The serializer for the keys.
- * @param valueSerializer The serializer for the values.
- * @param filePath The path where the snapshot data is stored.
- */
- public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
- super(filePath);
- this.keySerializerClassName = keySerializer.getClass().getName();
- this.valueSerializerClassName = valueSerializer.getClass().getName();
- }
-
- @Override
- public FsHeapKvState<K, V> restoreState(
- FsStateBackend stateBackend,
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer,
- V defaultValue,
- ClassLoader classLoader) throws Exception {
-
- // validity checks
- if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
- !valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
- throw new IllegalArgumentException(
- "Cannot restore the state from the snapshot with the given serializers. " +
- "State (K/V) was serialized with (" + valueSerializerClassName +
- "/" + keySerializerClassName + ")");
- }
-
- // state restore
- try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
- InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
-
- final int numEntries = inView.readInt();
- HashMap<K, V> stateMap = new HashMap<>(numEntries);
-
- for (int i = 0; i < numEntries; i++) {
- K key = keySerializer.deserialize(inView);
- V value = valueSerializer.deserialize(inView);
- stateMap.put(key, value);
- }
-
- return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
- }
- catch (Exception e) {
- throw new Exception("Failed to restore state from file system", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
deleted file mode 100644
index 3cbd227..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-
-/**
- * The file state backend is a state backend that stores the state of streaming jobs in a file system.
- *
- * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
- * files for each state, for example:
- *
- * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
- */
-public class FsStateBackend extends StateBackend<FsStateBackend> {
-
- private static final long serialVersionUID = -8191916350224044011L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
-
-
- /** The path to the directory for the checkpoint data, including the file system
- * description via scheme and optional authority */
- private final Path basePath;
-
- /** The directory (job specific) into this initialized instance of the backend stores its data */
- private transient Path checkpointDirectory;
-
- /** Cached handle to the file system for file operations */
- private transient FileSystem filesystem;
-
-
- /**
- * Creates a new state backend that stores its checkpoint data in the file system and location
- * defined by the given URI.
- *
- * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
- * must be accessible via {@link FileSystem#get(URI)}.
- *
- * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
- * (host and port), or that the Hadoop configuration that describes that information must be in the
- * classpath.
- *
- * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
- * and the path to teh checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
- */
- public FsStateBackend(String checkpointDataUri) throws IOException {
- this(new Path(checkpointDataUri));
- }
-
- /**
- * Creates a new state backend that stores its checkpoint data in the file system and location
- * defined by the given URI.
- *
- * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
- * must be accessible via {@link FileSystem#get(URI)}.
- *
- * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
- * (host and port), or that the Hadoop configuration that describes that information must be in the
- * classpath.
- *
- * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
- * and the path to teh checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
- */
- public FsStateBackend(Path checkpointDataUri) throws IOException {
- this(checkpointDataUri.toUri());
- }
-
- /**
- * Creates a new state backend that stores its checkpoint data in the file system and location
- * defined by the given URI.
- *
- * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
- * must be accessible via {@link FileSystem#get(URI)}.
- *
- * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
- * (host and port), or that the Hadoop configuration that describes that information must be in the
- * classpath.
- *
- * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
- * and the path to teh checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
- */
- public FsStateBackend(URI checkpointDataUri) throws IOException {
- final String scheme = checkpointDataUri.getScheme();
- final String path = checkpointDataUri.getPath();
-
- // some validity checks
- if (scheme == null) {
- throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
- "Please specify the file system scheme explicitly in the URI.");
- }
- if (path == null) {
- throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
- "Please specify a directory path for the checkpoint data.");
- }
- if (path.length() == 0 || path.equals("/")) {
- throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
- }
-
- // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
- // (distributed) filesystem on all hosts and includes full host/port information, even if the
- // original URI did not include that. We count on the filesystem loading from the configuration
- // to fill in the missing data.
-
- // try to grab the file system for this path/URI
- this.filesystem = FileSystem.get(checkpointDataUri);
- if (this.filesystem == null) {
- throw new IOException("Could not find a file system for the given scheme in the available configurations.");
- }
-
- URI fsURI = this.filesystem.getUri();
- try {
- URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
- this.basePath = new Path(baseURI);
- }
- catch (URISyntaxException e) {
- throw new IOException(
- String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s",
- checkpointDataUri, fsURI), e);
- }
- }
-
- /**
- * Gets the base directory where all state-containing files are stored.
- * The job specific directory is created inside this directory.
- *
- * @return The base directory.
- */
- public Path getBasePath() {
- return basePath;
- }
-
- /**
- * Gets the directory where this state backend stores its checkpoint data. Will be null if
- * the state backend has not been initialized.
- *
- * @return The directory where this state backend stores its checkpoint data.
- */
- public Path getCheckpointDirectory() {
- return checkpointDirectory;
- }
-
- /**
- * Checks whether this state backend is initialized. Note that initialization does not carry
- * across serialization. After each serialization, the state backend needs to be initialized.
- *
- * @return True, if the file state backend has been initialized, false otherwise.
- */
- public boolean isInitialized() {
- return filesystem != null && checkpointDirectory != null;
- }
-
- /**
- * Gets the file system handle for the file system that stores the state for this backend.
- *
- * @return This backend's file system handle.
- */
- public FileSystem getFileSystem() {
- if (filesystem != null) {
- return filesystem;
- }
- else {
- throw new IllegalStateException("State backend has not been initialized.");
- }
- }
-
- // ------------------------------------------------------------------------
- // initialization and cleanup
- // ------------------------------------------------------------------------
-
- @Override
- public void initializeForJob(JobID jobId) throws Exception {
- Path dir = new Path(basePath, jobId.toString());
-
- LOG.info("Initializing file state backend to URI " + dir);
-
- filesystem = basePath.getFileSystem();
- filesystem.mkdirs(dir);
-
- checkpointDirectory = dir;
- }
-
- @Override
- public void disposeAllStateForCurrentJob() throws Exception {
- FileSystem fs = this.filesystem;
- Path dir = this.checkpointDirectory;
-
- if (fs != null && dir != null) {
- this.filesystem = null;
- this.checkpointDirectory = null;
- fs.delete(dir, true);
- }
- else {
- throw new IllegalStateException("state backend has not been initialized");
- }
- }
-
- @Override
- public void close() throws Exception {}
-
- // ------------------------------------------------------------------------
- // state backend operations
- // ------------------------------------------------------------------------
-
- @Override
- public <K, V> FsHeapKvState<K, V> createKvState(
- TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
- return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
- }
-
- @Override
- public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception
- {
- checkFileSystemInitialized();
-
- // make sure the directory for that specific checkpoint exists
- final Path checkpointDir = createCheckpointDirPath(checkpointID);
- filesystem.mkdirs(checkpointDir);
-
-
- Exception latestException = null;
-
- for (int attempt = 0; attempt < 10; attempt++) {
- Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
- FSDataOutputStream outStream;
- try {
- outStream = filesystem.create(targetPath, false);
- }
- catch (Exception e) {
- latestException = e;
- continue;
- }
-
- ObjectOutputStream os = new ObjectOutputStream(outStream);
- os.writeObject(state);
- os.close();
- return new FileSerializableStateHandle<S>(targetPath);
- }
-
- throw new Exception("Could not open output stream for state backend", latestException);
- }
-
- @Override
- public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
- checkFileSystemInitialized();
-
- final Path checkpointDir = createCheckpointDirPath(checkpointID);
- filesystem.mkdirs(checkpointDir);
-
- Exception latestException = null;
-
- for (int attempt = 0; attempt < 10; attempt++) {
- Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
- try {
- FSDataOutputStream outStream = filesystem.create(targetPath, false);
- return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
- }
- catch (Exception e) {
- latestException = e;
- }
- }
- throw new Exception("Could not open output stream for state backend", latestException);
- }
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private void checkFileSystemInitialized() throws IllegalStateException {
- if (filesystem == null || checkpointDirectory == null) {
- throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
- }
- }
-
- private Path createCheckpointDirPath(long checkpointID) {
- return new Path(checkpointDirectory, "chk-" + checkpointID);
- }
-
- @Override
- public String toString() {
- return checkpointDirectory == null ?
- "File State Backend @ " + basePath :
- "File State Backend (initialized) @ " + checkpointDirectory;
- }
-
- // ------------------------------------------------------------------------
- // Output stream for state checkpointing
- // ------------------------------------------------------------------------
-
- /**
- * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon
- * closing.
- */
- public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
-
- private final FSDataOutputStream outStream;
-
- private final Path filePath;
-
- private final FileSystem fs;
-
- private boolean closed;
-
- FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
- this.outStream = outStream;
- this.filePath = filePath;
- this.fs = fs;
- }
-
-
- @Override
- public void write(int b) throws IOException {
- outStream.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- outStream.write(b, off, len);
- }
-
- @Override
- public void flush() throws IOException {
- outStream.flush();
- }
-
- /**
- * If the stream is only closed, we remove the produced file (cleanup through the auto close
- * feature, for example). This method throws no exception if the deletion fails, but only
- * logs the error.
- */
- @Override
- public void close() {
- synchronized (this) {
- if (!closed) {
- closed = true;
- try {
- outStream.close();
- fs.delete(filePath, false);
-
- // attempt to delete the parent (will fail and be ignored if the parent has more files)
- try {
- fs.delete(filePath.getParent(), false);
- } catch (IOException ignored) {}
- }
- catch (Exception e) {
- LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e);
- }
- }
- }
- }
-
- @Override
- public FileStreamStateHandle closeAndGetHandle() throws IOException {
- return new FileStreamStateHandle(closeAndGetPath());
- }
-
- /**
- * Closes the stream and returns the path to the file that contains the stream's data.
- * @return The path to the file that contains the stream's data.
- * @throws IOException Thrown if the stream cannot be successfully closed.
- */
- public Path closeAndGetPath() throws IOException {
- synchronized (this) {
- if (!closed) {
- closed = true;
- outStream.close();
- return filePath;
- }
- else {
- throw new IOException("Stream has already been closed and discarded.");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
deleted file mode 100644
index f0ad6bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.filesystem;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-
-/**
- * A factory that creates an {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
- * from a configuration.
- */
-public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
-
- /** The key under which the config stores the directory where checkpoints should be stored */
- public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-
-
- @Override
- public FsStateBackend createFromConfig(Configuration config) throws Exception {
- String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-
- if (checkpointDirURI == null) {
- throw new IllegalConfigurationException(
- "Cannot create the file system state backend: The configuration does not specify the " +
- "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
- }
-
- try {
- Path path = new Path(checkpointDirURI);
- return new FsStateBackend(path);
- }
- catch (IllegalArgumentException e) {
- throw new Exception("Cannot initialize File System State Backend with URI '"
- + checkpointDirURI + '.', e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
deleted file mode 100644
index 7952e58..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-/**
- * A state handle that contains stream state in a byte array.
- */
-public final class ByteStreamStateHandle implements StreamStateHandle {
-
- private static final long serialVersionUID = -5280226231200217594L;
-
- /** the state data */
- private final byte[] data;
-
- /**
- * Creates a new ByteStreamStateHandle containing the given data.
- *
- * @param data The state data.
- */
- public ByteStreamStateHandle(byte[] data) {
- this.data = data;
- }
-
- @Override
- public InputStream getState(ClassLoader userCodeClassLoader) {
- return new ByteArrayInputStream(data);
- }
-
- @Override
- public void discardState() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
deleted file mode 100644
index e611887..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.state.AbstractHeapKvState;
-
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> {
-
- public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
- super(keySerializer, valueSerializer, defaultValue);
- }
-
- public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
- V defaultValue, HashMap<K, V> state) {
- super(keySerializer, valueSerializer, defaultValue, state);
- }
-
- @Override
- public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
- DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
- writeStateToOutputView(ser);
- byte[] bytes = ser.getCopyOfBuffer();
-
- return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
deleted file mode 100644
index 7f50379..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
-
-import java.util.HashMap;
-
-/**
- * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte
- * array, in serialized form.
- *
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, MemoryStateBackend> {
-
- private static final long serialVersionUID = 1L;
-
- /** Name of the key serializer class */
- private final String keySerializerClassName;
-
- /** Name of the value serializer class */
- private final String valueSerializerClassName;
-
- /** The serialized data of the state key/value pairs */
- private final byte[] data;
-
- /** The number of key/value pairs */
- private final int numEntries;
-
- /**
- * Creates a new heap memory state snapshot.
- *
- * @param keySerializer The serializer for the keys.
- * @param valueSerializer The serializer for the values.
- * @param data The serialized data of the state key/value pairs
- * @param numEntries The number of key/value pairs
- */
- public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer, byte[] data, int numEntries) {
- this.keySerializerClassName = keySerializer.getClass().getName();
- this.valueSerializerClassName = valueSerializer.getClass().getName();
- this.data = data;
- this.numEntries = numEntries;
- }
-
-
- @Override
- public MemHeapKvState<K, V> restoreState(
- MemoryStateBackend stateBackend,
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer,
- V defaultValue,
- ClassLoader classLoader) throws Exception {
-
- // validity checks
- if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
- !valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
- throw new IllegalArgumentException(
- "Cannot restore the state from the snapshot with the given serializers. " +
- "State (K/V) was serialized with (" + valueSerializerClassName +
- "/" + keySerializerClassName + ")");
- }
-
- // restore state
- HashMap<K, V> stateMap = new HashMap<>(numEntries);
- DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length);
-
- for (int i = 0; i < numEntries; i++) {
- K key = keySerializer.deserialize(in);
- V value = valueSerializer.deserialize(in);
- stateMap.put(key, value);
- }
-
- return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap);
- }
-
- /**
- * Discarding the heap state is a no-op.
- */
- @Override
- public void discardState() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
deleted file mode 100644
index 05368bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StreamStateHandle;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A {@link StateBackend} that stores all its data and checkpoints in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred
- */
-public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
-
- private static final long serialVersionUID = 4109305377809414635L;
-
- /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
- private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
-
- /** The maximal size that the snapshotted memory state may have */
- private final int maxStateSize;
-
- /**
- * Creates a new memory state backend that accepts states whose serialized forms are
- * up to the default state size (5 MB).
- */
- public MemoryStateBackend() {
- this(DEFAULT_MAX_STATE_SIZE);
- }
-
- /**
- * Creates a new memory state backend that accepts states whose serialized forms are
- * up to the given number of bytes.
- *
- * @param maxStateSize The maximal size of the serialized state
- */
- public MemoryStateBackend(int maxStateSize) {
- this.maxStateSize = maxStateSize;
- }
-
- // ------------------------------------------------------------------------
- // initialization and cleanup
- // ------------------------------------------------------------------------
-
- @Override
- public void initializeForJob(JobID job) {
- // nothing to do here
- }
-
- @Override
- public void disposeAllStateForCurrentJob() {
- // nothing to do here, GC will do it
- }
-
- @Override
- public void close() throws Exception {}
-
- // ------------------------------------------------------------------------
- // State backend operations
- // ------------------------------------------------------------------------
-
- @Override
- public <K, V> MemHeapKvState<K, V> createKvState(
- TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
- return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
- }
-
- /**
- * Serialized the given state into bytes using Java serialization and creates a state handle that
- * can re-create that state.
- *
- * @param state The state to checkpoint.
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @param <S> The type of the state.
- *
- * @return A state handle that contains the given state serialized as bytes.
- * @throws Exception Thrown, if the serialization fails.
- */
- @Override
- public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception
- {
- SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
- checkSize(handle.getSizeOfSerializedState(), maxStateSize);
- return new SerializedStateHandle<S>(state);
- }
-
- @Override
- public CheckpointStateOutputStream createCheckpointStateOutputStream(
- long checkpointID, long timestamp) throws Exception
- {
- return new MemoryCheckpointOutputStream(maxStateSize);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
- }
-
- static void checkSize(int size, int maxSize) throws IOException {
- if (size > maxSize) {
- throw new IOException(
- "Size of the state is larger than the maximum permitted memory-backed state. Size="
- + size + " , maxSize=" + maxSize
- + " . Consider using a different state backend, like the File System State backend.");
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A CheckpointStateOutputStream that writes into a byte array.
- */
- public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
-
- private final ByteArrayOutputStream os = new ByteArrayOutputStream();
-
- private final int maxSize;
-
- private boolean closed;
-
- public MemoryCheckpointOutputStream(int maxSize) {
- this.maxSize = maxSize;
- }
-
- @Override
- public void write(int b) {
- os.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) {
- os.write(b, off, len);
- }
-
- // --------------------------------------------------------------------
-
- @Override
- public void close() {
- closed = true;
- os.reset();
- }
-
- @Override
- public StreamStateHandle closeAndGetHandle() throws IOException {
- return new ByteStreamStateHandle(closeAndGetBytes());
- }
-
- /**
- * Closes the stream and returns the byte array containing the stream's data.
- * @return The byte array containing the stream's data.
- * @throws IOException Thrown if the size of the data exceeds the maximal
- */
- public byte[] closeAndGetBytes() throws IOException {
- if (!closed) {
- checkSize(os.size(), maxSize);
- byte[] bytes = os.toByteArray();
- close();
- return bytes;
- }
- else {
- throw new IllegalStateException("stream has already been closed");
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Static default instance
- // ------------------------------------------------------------------------
-
- /** The default instance of this state backend, using the default maximal state size */
- private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
-
- /**
- * Gets the default instance of this state backend, using the default maximal state size.
- * @return The default instance of this state backend.
- */
- public static MemoryStateBackend defaultInstance() {
- return DEFAULT_INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 163cadd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state.memory;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> {
-
- private static final long serialVersionUID = 4145685722538475769L;
-
- public SerializedStateHandle(T value) throws IOException {
- super(value);
- }
-
- @Override
- public T getState(ClassLoader classLoader) throws Exception {
- return deserializeValue(classLoader);
- }
-
- /**
- * Discarding heap-memory backed state is a no-op, so this method does nothing.
- */
- @Override
- public void discardState() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index cf8575e..9964760 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 72a8c25..8c58e29 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,17 +30,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.StateBackendFactory;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -493,55 +492,52 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private StateBackend<?> createStateBackend() throws Exception {
StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
-
+
if (configuredBackend != null) {
// backend has been configured on the environment
LOG.info("Using user-defined state backend: " + configuredBackend);
return configuredBackend;
- }
- else {
+ } else {
// see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
-
+
if (backendName == null) {
LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
backendName = "jobmanager";
}
-
+
backendName = backendName.toLowerCase();
switch (backendName) {
case "jobmanager":
LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
return MemoryStateBackend.defaultInstance();
-
+
case "filesystem":
FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
- + backend.getBasePath() + "\")");
+ + backend.getBasePath() + "\")");
return backend;
-
+
default:
try {
@SuppressWarnings("rawtypes")
Class<? extends StateBackendFactory> clazz =
- Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
+ Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
return (StateBackend<?>) clazz.newInstance();
- }
- catch (ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
- }
- catch (ClassCastException e) {
+ } catch (ClassCastException e) {
throw new IllegalConfigurationException("The class configured under '" +
- ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
- backendName + ')');
- }
- catch (Throwable t) {
+ ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+ backendName + ')');
+ } catch (Throwable t) {
throw new IllegalConfigurationException("Cannot create configured state backend", t);
}
}
}
+ }
/**
* Registers a timer.
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 334fd44..afeabd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.KvStateSnapshot;
import java.io.Serializable;
import java.util.ConcurrentModificationException;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
deleted file mode 100644
index 73100d1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class FileStateBackendTest {
-
- @Test
- public void testSetupAndSerialization() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- final String backendDir = localFileUri(tempDir);
- FsStateBackend originalBackend = new FsStateBackend(backendDir);
-
- assertFalse(originalBackend.isInitialized());
- assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
- assertNull(originalBackend.getCheckpointDirectory());
-
- // serialize / copy the backend
- FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
- assertFalse(backend.isInitialized());
- assertEquals(new URI(backendDir), backend.getBasePath().toUri());
- assertNull(backend.getCheckpointDirectory());
-
- // no file operations should be possible right now
- try {
- backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
- fail("should fail with an exception");
- } catch (IllegalStateException e) {
- // supreme!
- }
-
- backend.initializeForJob(new JobID());
- assertNotNull(backend.getCheckpointDirectory());
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
- assertTrue(checkpointDir.exists());
- assertTrue(isDirectoryEmpty(checkpointDir));
-
- backend.disposeAllStateForCurrentJob();
- assertNull(backend.getCheckpointDirectory());
-
- assertTrue(isDirectoryEmpty(tempDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- @Test
- public void testSerializableState() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
- String state1 = "dummy state";
- String state2 = "row row row your boat";
- Integer state3 = 42;
-
- StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
- StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
- StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
-
- assertFalse(isDirectoryEmpty(checkpointDir));
- assertEquals(state1, handle1.getState(getClass().getClassLoader()));
- handle1.discardState();
-
- assertFalse(isDirectoryEmpty(checkpointDir));
- assertEquals(state2, handle2.getState(getClass().getClassLoader()));
- handle2.discardState();
-
- assertFalse(isDirectoryEmpty(checkpointDir));
- assertEquals(state3, handle3.getState(getClass().getClassLoader()));
- handle3.discardState();
-
- assertTrue(isDirectoryEmpty(checkpointDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- @Test
- public void testStateOutputStream() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
- byte[] state1 = new byte[1274673];
- byte[] state2 = new byte[1];
- byte[] state3 = new byte[0];
- byte[] state4 = new byte[177];
-
- Random rnd = new Random();
- rnd.nextBytes(state1);
- rnd.nextBytes(state2);
- rnd.nextBytes(state3);
- rnd.nextBytes(state4);
-
- long checkpointId = 97231523452L;
-
- FsStateBackend.FsCheckpointStateOutputStream stream1 =
- backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
- FsStateBackend.FsCheckpointStateOutputStream stream2 =
- backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
- FsStateBackend.FsCheckpointStateOutputStream stream3 =
- backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-
- stream1.write(state1);
- stream2.write(state2);
- stream3.write(state3);
-
- FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
- FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
- FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
-
- // use with try-with-resources
- StreamStateHandle handle4;
- try (StateBackend.CheckpointStateOutputStream stream4 =
- backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
- stream4.write(state4);
- handle4 = stream4.closeAndGetHandle();
- }
-
- // close before accessing handle
- StateBackend.CheckpointStateOutputStream stream5 =
- backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
- stream5.write(state4);
- stream5.close();
- try {
- stream5.closeAndGetHandle();
- fail();
- } catch (IOException e) {
- // uh-huh
- }
-
- validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
- handle1.discardState();
- assertFalse(isDirectoryEmpty(checkpointDir));
- ensureLocalFileDeleted(handle1.getFilePath());
-
- validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
- handle2.discardState();
- assertFalse(isDirectoryEmpty(checkpointDir));
- ensureLocalFileDeleted(handle2.getFilePath());
-
- validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
- handle3.discardState();
- assertFalse(isDirectoryEmpty(checkpointDir));
- ensureLocalFileDeleted(handle3.getFilePath());
-
- validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
- handle4.discardState();
- assertTrue(isDirectoryEmpty(checkpointDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- @Test
- public void testKeyValueState() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
- KvState<Integer, String, FsStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
- assertEquals(0, kv.size());
-
- // some modifications to the state
- kv.setCurrentKey(1);
- assertNull(kv.value());
- kv.update("1");
- assertEquals(1, kv.size());
- kv.setCurrentKey(2);
- assertNull(kv.value());
- kv.update("2");
- assertEquals(2, kv.size());
- kv.setCurrentKey(1);
- assertEquals("1", kv.value());
- assertEquals(2, kv.size());
-
- // draw a snapshot
- KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
- kv.shapshot(682375462378L, System.currentTimeMillis());
-
- // make some more modifications
- kv.setCurrentKey(1);
- kv.update("u1");
- kv.setCurrentKey(2);
- kv.update("u2");
- kv.setCurrentKey(3);
- kv.update("u3");
-
- // draw another snapshot
- KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
- kv.shapshot(682375462379L, System.currentTimeMillis());
-
- // validate the original state
- assertEquals(3, kv.size());
- kv.setCurrentKey(1);
- assertEquals("u1", kv.value());
- kv.setCurrentKey(2);
- assertEquals("u2", kv.value());
- kv.setCurrentKey(3);
- assertEquals("u3", kv.value());
-
- // restore the first snapshot and validate it
- KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
- IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
- assertEquals(2, restored1.size());
- restored1.setCurrentKey(1);
- assertEquals("1", restored1.value());
- restored1.setCurrentKey(2);
- assertEquals("2", restored1.value());
-
- // restore the first snapshot and validate it
- KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
- IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
- assertEquals(3, restored2.size());
- restored2.setCurrentKey(1);
- assertEquals("u1", restored2.value());
- restored2.setCurrentKey(2);
- assertEquals("u2", restored2.value());
- restored2.setCurrentKey(3);
- assertEquals("u3", restored2.value());
-
- snapshot1.discardState();
- assertFalse(isDirectoryEmpty(checkpointDir));
-
- snapshot2.discardState();
- assertTrue(isDirectoryEmpty(checkpointDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- @Test
- public void testRestoreWithWrongSerializers() {
- File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
- try {
- FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
-
- File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
- KvState<Integer, String, FsStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
- kv.setCurrentKey(1);
- kv.update("1");
- kv.setCurrentKey(2);
- kv.update("2");
-
- KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
- kv.shapshot(682375462378L, System.currentTimeMillis());
-
-
- @SuppressWarnings("unchecked")
- TypeSerializer<Integer> fakeIntSerializer =
- (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
-
- @SuppressWarnings("unchecked")
- TypeSerializer<String> fakeStringSerializer =
- (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
- try {
- snapshot.restoreState(backend, fakeIntSerializer,
- StringSerializer.INSTANCE, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
-
- try {
- snapshot.restoreState(backend, IntSerializer.INSTANCE,
- fakeStringSerializer, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
-
- try {
- snapshot.restoreState(backend, fakeIntSerializer,
- fakeStringSerializer, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
-
- snapshot.discardState();
-
- assertTrue(isDirectoryEmpty(checkpointDir));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- deleteDirectorySilently(tempDir);
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static void ensureLocalFileDeleted(Path path) {
- URI uri = path.toUri();
- if ("file".equals(uri.getScheme())) {
- File file = new File(uri.getPath());
- assertFalse("file not properly deleted", file.exists());
- }
- else {
- throw new IllegalArgumentException("not a local path");
- }
- }
-
- private static void deleteDirectorySilently(File dir) {
- try {
- FileUtils.deleteDirectory(dir);
- }
- catch (IOException ignored) {}
- }
-
- private static boolean isDirectoryEmpty(File directory) {
- String[] nested = directory.list();
- return nested == null || nested.length == 0;
- }
-
- private static String localFileUri(File path) {
- return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
- }
-
- private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
- byte[] holder = new byte[data.length];
- assertEquals("not enough data", holder.length, is.read(holder));
- assertEquals("too much data", -1, is.read());
- assertArrayEquals("wrong data", data, holder);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
deleted file mode 100644
index 3410d09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}.
- */
-public class MemoryStateBackendTest {
-
- @Test
- public void testSerializableState() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend();
-
- HashMap<String, Integer> state = new HashMap<>();
- state.put("hey there", 2);
- state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
- StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
- assertNotNull(handle);
-
- HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
- assertEquals(state, restored);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testOversizedState() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend(10);
-
- HashMap<String, Integer> state = new HashMap<>();
- state.put("hey there", 2);
- state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
- try {
- backend.checkpointStateSerializable(state, 12, 459);
- fail("this should cause an exception");
- }
- catch (IOException e) {
- // now darling, isn't that exactly what we wanted?
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testStateStream() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend();
-
- HashMap<String, Integer> state = new HashMap<>();
- state.put("hey there", 2);
- state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
- StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
- ObjectOutputStream oos = new ObjectOutputStream(os);
- oos.writeObject(state);
- oos.flush();
- StreamStateHandle handle = os.closeAndGetHandle();
-
- assertNotNull(handle);
-
- ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
- assertEquals(state, ois.readObject());
- assertTrue(ois.available() <= 0);
- ois.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testOversizedStateStream() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend(10);
-
- HashMap<String, Integer> state = new HashMap<>();
- state.put("hey there", 2);
- state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
- StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
- ObjectOutputStream oos = new ObjectOutputStream(os);
-
- try {
- oos.writeObject(state);
- oos.flush();
- os.closeAndGetHandle();
- fail("this should cause an exception");
- }
- catch (IOException e) {
- // oh boy! what an exception!
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testKeyValueState() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend();
-
- KvState<Integer, String, MemoryStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
- assertEquals(0, kv.size());
-
- // some modifications to the state
- kv.setCurrentKey(1);
- assertNull(kv.value());
- kv.update("1");
- assertEquals(1, kv.size());
- kv.setCurrentKey(2);
- assertNull(kv.value());
- kv.update("2");
- assertEquals(2, kv.size());
- kv.setCurrentKey(1);
- assertEquals("1", kv.value());
- assertEquals(2, kv.size());
-
- // draw a snapshot
- KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 =
- kv.shapshot(682375462378L, System.currentTimeMillis());
-
- // make some more modifications
- kv.setCurrentKey(1);
- kv.update("u1");
- kv.setCurrentKey(2);
- kv.update("u2");
- kv.setCurrentKey(3);
- kv.update("u3");
-
- // draw another snapshot
- KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
- kv.shapshot(682375462379L, System.currentTimeMillis());
-
- // validate the original state
- assertEquals(3, kv.size());
- kv.setCurrentKey(1);
- assertEquals("u1", kv.value());
- kv.setCurrentKey(2);
- assertEquals("u2", kv.value());
- kv.setCurrentKey(3);
- assertEquals("u3", kv.value());
-
- // restore the first snapshot and validate it
- KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
- IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
- assertEquals(2, restored1.size());
- restored1.setCurrentKey(1);
- assertEquals("1", restored1.value());
- restored1.setCurrentKey(2);
- assertEquals("2", restored1.value());
-
- // restore the first snapshot and validate it
- KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
- IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
-
- assertEquals(3, restored2.size());
- restored2.setCurrentKey(1);
- assertEquals("u1", restored2.value());
- restored2.setCurrentKey(2);
- assertEquals("u2", restored2.value());
- restored2.setCurrentKey(3);
- assertEquals("u3", restored2.value());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testRestoreWithWrongSerializers() {
- try {
- MemoryStateBackend backend = new MemoryStateBackend();
- KvState<Integer, String, MemoryStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
- kv.setCurrentKey(1);
- kv.update("1");
- kv.setCurrentKey(2);
- kv.update("2");
-
- KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
- kv.shapshot(682375462378L, System.currentTimeMillis());
-
-
- @SuppressWarnings("unchecked")
- TypeSerializer<Integer> fakeIntSerializer =
- (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
-
- @SuppressWarnings("unchecked")
- TypeSerializer<String> fakeStringSerializer =
- (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
-
- try {
- snapshot.restoreState(backend, fakeIntSerializer,
- StringSerializer.INSTANCE, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
-
- try {
- snapshot.restoreState(backend, IntSerializer.INSTANCE,
- fakeStringSerializer, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
-
- try {
- snapshot.restoreState(backend, fakeIntSerializer,
- fakeStringSerializer, null, getClass().getClassLoader());
- fail("should recognize wrong serializers");
- } catch (IllegalArgumentException e) {
- // expected
- } catch (Exception e) {
- fail("wrong exception");
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index dd76a67..ad3c838 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index ab8e551..4bd260f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 81d3a69..0c708c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index b83feca..01f95bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.state.StateBackend;
-import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;