You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:23 UTC
[32/47] flink git commit: [FLINK-2354] [runtime] Replace old
StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
deleted file mode 100644
index 0086ac6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-
-import java.io.Serializable;
-
-/**
- * State handler provider factory.
- *
- * <p>This is going to be superseded soon.
- */
-public class StateHandleProviderFactory {
-
- /**
- * Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
- * the configured recovery path.
- */
- public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider(
- Configuration config) {
-
- StateBackend stateBackend = StateBackend.fromConfig(config);
-
- if (stateBackend == StateBackend.FILESYSTEM) {
- String recoveryPath = config.getString(
- ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
-
- if (recoveryPath.equals("")) {
- throw new IllegalConfigurationException("Missing recovery path. Specify via " +
- "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
- }
- else {
- return FileStateHandle.createProvider(recoveryPath);
- }
- }
- else {
- throw new IllegalConfigurationException("Unexpected state backend configuration " +
- stateBackend);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
new file mode 100644
index 0000000..32c601e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that produces an input stream when resolved.
+ */
+public interface StreamStateHandle extends StateHandle<InputStream> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
new file mode 100644
index 0000000..d64e2c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+public abstract class AbstractFileState implements java.io.Serializable {
+
+ private static final long serialVersionUID = 350284443258002355L;
+
+ /** The path to the file in the filesystem, fully describing the file system */
+ private final Path filePath;
+
+ /** Cached file system handle */
+ private transient FileSystem fs;
+
+ /**
+ * Creates a new file state for the given file path.
+ *
+ * @param filePath The path to the file that stores the state.
+ */
+ protected AbstractFileState(Path filePath) {
+ this.filePath = filePath;
+ }
+
+ /**
+ * Gets the path where this handle's state is stored.
+ * @return The path where this handle's state is stored.
+ */
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ /**
+ * Discard the state by deleting the file that stores the state. If the parent directory
+ * of the state is empty after deleting the state file, it is also deleted.
+ *
+ * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+ */
+ public void discardState() throws Exception {
+ getFileSystem().delete(filePath, false);
+
+ // send a call to delete the directory containing the file. this will
+ // fail (and be ignored) when some files still exist
+ try {
+ getFileSystem().delete(filePath.getParent(), false);
+ } catch (IOException ignored) {}
+ }
+
+ /**
+ * Gets the file system that stores the file state.
+ * @return The file system that stores the file state.
+ * @throws IOException Thrown if the file system cannot be accessed.
+ */
+ protected FileSystem getFileSystem() throws IOException {
+ if (fs == null) {
+ fs = FileSystem.get(filePath.toUri());
+ }
+ return fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
new file mode 100644
index 0000000..b7e7cd1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+
+/**
+ * A state handle that points to state stored in a file via Java Serialization.
+ *
+ * @param <T> The type of state pointed to by the state handle.
+ */
+public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> {
+
+ private static final long serialVersionUID = -657631394290213622L;
+
+ /**
+ * Creates a new FileSerializableStateHandle pointing to state at the given file path.
+ *
+ * @param filePath The path to the file containing the checkpointed state.
+ */
+ public FileSerializableStateHandle(Path filePath) {
+ super(filePath);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T getState(ClassLoader classLoader) throws Exception {
+ FSDataInputStream inStream = getFileSystem().open(getFilePath());
+ ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+ return (T) ois.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
new file mode 100644
index 0000000..f4681ea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.InputStream;
+
+/**
+ * A state handle that points to state in a file system, accessible as an input stream.
+ */
+public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle {
+
+ private static final long serialVersionUID = -6826990484549987311L;
+
+ /**
+ * Creates a new FileStreamStateHandle pointing to state at the given file path.
+ *
+ * @param filePath The path to the file containing the checkpointed state.
+ */
+ public FileStreamStateHandle(Path filePath) {
+ super(filePath);
+ }
+
+ @Override
+ public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+ return getFileSystem().open(getFilePath());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
new file mode 100644
index 0000000..e3116dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.runtime.state.AbstractHeapKvState;
+
+import java.io.DataOutputStream;
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> {
+
+ /** The file system state backend backing snapshots of this state */
+ private final FsStateBackend backend;
+
+ /**
+ * Creates a new and empty key/value state.
+ *
+ * @param keySerializer The serializer for the key.
+ * @param valueSerializer The serializer for the value.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ * @param backend The file system state backend backing snapshots of this state
+ */
+ public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+ V defaultValue, FsStateBackend backend) {
+ super(keySerializer, valueSerializer, defaultValue);
+ this.backend = backend;
+ }
+
+ /**
+ * Creates a new key/value state with the given state contents.
+ * This method is used to re-create key/value state with existing data, for example from
+ * a snapshot.
+ *
+ * @param keySerializer The serializer for the key.
+ * @param valueSerializer The serializer for the value.
+ * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+ * @param state The map of key/value pairs to initialize the state with.
+ * @param backend The file system state backend backing snapshots of this state
+ */
+ public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+ V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
+ super(keySerializer, valueSerializer, defaultValue, state);
+ this.backend = backend;
+ }
+
+
+ @Override
+ public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
+ // first, create an output stream to write to
+ try (FsStateBackend.FsCheckpointStateOutputStream out =
+ backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
+
+ // serialize the state to the output stream
+ OutputViewDataOutputStreamWrapper outView =
+ new OutputViewDataOutputStreamWrapper(new DataOutputStream(out));
+ outView.writeInt(size());
+ writeStateToOutputView(outView);
+ outView.flush();
+
+ // create a handle to the state
+ return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
new file mode 100644
index 0000000..781ee3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.DataInputStream;
+import java.util.HashMap;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ *
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Name of the key serializer class */
+ private final String keySerializerClassName;
+
+ /** Name of the value serializer class */
+ private final String valueSerializerClassName;
+
+ /**
+ * Creates a new state snapshot with data in the file system.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param filePath The path where the snapshot data is stored.
+ */
+ public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
+ super(filePath);
+ this.keySerializerClassName = keySerializer.getClass().getName();
+ this.valueSerializerClassName = valueSerializer.getClass().getName();
+ }
+
+ @Override
+ public FsHeapKvState<K, V> restoreState(
+ FsStateBackend stateBackend,
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<V> valueSerializer,
+ V defaultValue,
+ ClassLoader classLoader) throws Exception {
+
+ // validity checks
+ if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+ !valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+ throw new IllegalArgumentException(
+ "Cannot restore the state from the snapshot with the given serializers. " +
+ "State (K/V) was serialized with (" + valueSerializerClassName +
+ "/" + keySerializerClassName + ")");
+ }
+
+ // state restore
+ try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
+ InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
+
+ final int numEntries = inView.readInt();
+ HashMap<K, V> stateMap = new HashMap<>(numEntries);
+
+ for (int i = 0; i < numEntries; i++) {
+ K key = keySerializer.deserialize(inView);
+ V value = valueSerializer.deserialize(inView);
+ stateMap.put(key, value);
+ }
+
+ return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
+ }
+ catch (Exception e) {
+ throw new Exception("Failed to restore state from file system", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
new file mode 100644
index 0000000..045c411
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+/**
+ * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ *
+ * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
+ * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
+ * files for each state, for example:
+ *
+ * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ */
+public class FsStateBackend extends StateBackend<FsStateBackend> {
+
+ private static final long serialVersionUID = -8191916350224044011L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
+
+
+ /** The path to the directory for the checkpoint data, including the file system
+ * description via scheme and optional authority */
+ private final Path basePath;
+
+ /** The directory (job specific) into this initialized instance of the backend stores its data */
+ private transient Path checkpointDirectory;
+
+ /** Cached handle to the file system for file operations */
+ private transient FileSystem filesystem;
+
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to teh checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(String checkpointDataUri) throws IOException {
+ this(new Path(checkpointDataUri));
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to teh checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(Path checkpointDataUri) throws IOException {
+ this(checkpointDataUri.toUri());
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to teh checkpoint data directory.
+ * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ */
+ public FsStateBackend(URI checkpointDataUri) throws IOException {
+ final String scheme = checkpointDataUri.getScheme();
+ final String path = checkpointDataUri.getPath();
+
+ // some validity checks
+ if (scheme == null) {
+ throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+ "Please specify the file system scheme explicitly in the URI.");
+ }
+ if (path == null) {
+ throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+ "Please specify a directory path for the checkpoint data.");
+ }
+ if (path.length() == 0 || path.equals("/")) {
+ throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+ }
+
+ // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
+ // (distributed) filesystem on all hosts and includes full host/port information, even if the
+ // original URI did not include that. We count on the filesystem loading from the configuration
+ // to fill in the missing data.
+
+ // try to grab the file system for this path/URI
+ this.filesystem = FileSystem.get(checkpointDataUri);
+ if (this.filesystem == null) {
+ throw new IOException("Could not find a file system for the given scheme in the available configurations.");
+ }
+
+ URI fsURI = this.filesystem.getUri();
+ try {
+ URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null);
+ this.basePath = new Path(baseURI);
+ }
+ catch (URISyntaxException e) {
+ throw new IOException(
+ String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s",
+ checkpointDataUri, fsURI), e);
+ }
+ }
+
+ /**
+ * Gets the base directory where all state-containing files are stored.
+ * The job specific directory is created inside this directory.
+ *
+ * @return The base directory.
+ */
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ /**
+ * Gets the directory where this state backend stores its checkpoint data. Will be null if
+ * the state backend has not been initialized.
+ *
+ * @return The directory where this state backend stores its checkpoint data.
+ */
+ public Path getCheckpointDirectory() {
+ return checkpointDirectory;
+ }
+
+ /**
+ * Checks whether this state backend is initialized. Note that initialization does not carry
+ * across serialization. After each serialization, the state backend needs to be initialized.
+ *
+ * @return True, if the file state backend has been initialized, false otherwise.
+ */
+ public boolean isInitialized() {
+ return filesystem != null && checkpointDirectory != null;
+ }
+
+ /**
+ * Gets the file system handle for the file system that stores the state for this backend.
+ *
+ * @return This backend's file system handle.
+ */
+ public FileSystem getFileSystem() {
+ if (filesystem != null) {
+ return filesystem;
+ }
+ else {
+ throw new IllegalStateException("State backend has not been initialized.");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // initialization and cleanup
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeForJob(JobID jobId) throws Exception {
+ Path dir = new Path(basePath, jobId.toString());
+
+ LOG.info("Initializing file state backend to URI " + dir);
+
+ filesystem = basePath.getFileSystem();
+ filesystem.mkdirs(dir);
+
+ checkpointDirectory = dir;
+ }
+
+ @Override
+ public void disposeAllStateForCurrentJob() throws Exception {
+ FileSystem fs = this.filesystem;
+ Path dir = this.checkpointDirectory;
+
+ if (fs != null && dir != null) {
+ this.filesystem = null;
+ this.checkpointDirectory = null;
+ fs.delete(dir, true);
+ }
+ else {
+ throw new IllegalStateException("state backend has not been initialized");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ // ------------------------------------------------------------------------
+ // state backend operations
+ // ------------------------------------------------------------------------
+
+ @Override
+ public <K, V> FsHeapKvState<K, V> createKvState(
+ TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
+ return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
+ }
+
+ @Override
+ public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+ S state, long checkpointID, long timestamp) throws Exception
+ {
+ checkFileSystemInitialized();
+
+ // make sure the directory for that specific checkpoint exists
+ final Path checkpointDir = createCheckpointDirPath(checkpointID);
+ filesystem.mkdirs(checkpointDir);
+
+
+ Exception latestException = null;
+
+ for (int attempt = 0; attempt < 10; attempt++) {
+ Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
+ FSDataOutputStream outStream;
+ try {
+ outStream = filesystem.create(targetPath, false);
+ }
+ catch (Exception e) {
+ latestException = e;
+ continue;
+ }
+
+ ObjectOutputStream os = new ObjectOutputStream(outStream);
+ os.writeObject(state);
+ os.close();
+ return new FileSerializableStateHandle<S>(targetPath);
+ }
+
+ throw new Exception("Could not open output stream for state backend", latestException);
+ }
+
+ @Override
+ public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ checkFileSystemInitialized();
+
+ final Path checkpointDir = createCheckpointDirPath(checkpointID);
+ filesystem.mkdirs(checkpointDir);
+
+ Exception latestException = null;
+
+ for (int attempt = 0; attempt < 10; attempt++) {
+ Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
+ try {
+ FSDataOutputStream outStream = filesystem.create(targetPath, false);
+ return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
+ }
+ catch (Exception e) {
+ latestException = e;
+ }
+ }
+ throw new Exception("Could not open output stream for state backend", latestException);
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private void checkFileSystemInitialized() throws IllegalStateException {
+ if (filesystem == null || checkpointDirectory == null) {
+ throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
+ }
+ }
+
+ private Path createCheckpointDirPath(long checkpointID) {
+ return new Path(checkpointDirectory, "chk-" + checkpointID);
+ }
+
+ @Override
+ public String toString() {
+ return checkpointDirectory == null ?
+ "File State Backend @ " + basePath :
+ "File State Backend (initialized) @ " + checkpointDirectory;
+ }
+
+ // ------------------------------------------------------------------------
+ // Output stream for state checkpointing
+ // ------------------------------------------------------------------------
+
+ /**
+ * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon
+ * closing.
+ */
+ public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
+
+ private final FSDataOutputStream outStream;
+
+ private final Path filePath;
+
+ private final FileSystem fs;
+
+ private boolean closed;
+
+ FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
+ this.outStream = outStream;
+ this.filePath = filePath;
+ this.fs = fs;
+ }
+
+
+ @Override
+ public void write(int b) throws IOException {
+ outStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outStream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * If the stream is only closed, we remove the produced file (cleanup through the auto close
+ * feature, for example). This method throws no exception if the deletion fails, but only
+ * logs the error.
+ */
+ @Override
+ public void close() {
+ synchronized (this) {
+ if (!closed) {
+ closed = true;
+ try {
+ outStream.close();
+ fs.delete(filePath, false);
+
+ // attempt to delete the parent (will fail and be ignored if the parent has more files)
+ try {
+ fs.delete(filePath.getParent(), false);
+ } catch (IOException ignored) {}
+ }
+ catch (Exception e) {
+ LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public FileStreamStateHandle closeAndGetHandle() throws IOException {
+ return new FileStreamStateHandle(closeAndGetPath());
+ }
+
+ /**
+ * Closes the stream and returns the path to the file that contains the stream's data.
+ * @return The path to the file that contains the stream's data.
+ * @throws IOException Thrown if the stream cannot be successfully closed.
+ */
+ public Path closeAndGetPath() throws IOException {
+ synchronized (this) {
+ if (!closed) {
+ closed = true;
+ outStream.close();
+ return filePath;
+ }
+ else {
+ throw new IOException("Stream has already been closed and discarded.");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
new file mode 100644
index 0000000..e687f7f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+/**
+ * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
+ * from a configuration.
+ */
+public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
+
+ /** The key under which the config stores the directory where checkpoints should be stored */
+ public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
+
+
+ @Override
+ public FsStateBackend createFromConfig(Configuration config) throws Exception {
+ String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+
+ if (checkpointDirURI == null) {
+ throw new IllegalConfigurationException(
+ "Cannot create the file system state backend: The configuration does not specify the " +
+ "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+ }
+
+ try {
+ Path path = new Path(checkpointDirURI);
+ return new FsStateBackend(path);
+ }
+ catch (IllegalArgumentException e) {
+ throw new Exception("Cannot initialize File System State Backend with URI '"
+ + checkpointDirURI + '.', e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
new file mode 100644
index 0000000..29762f7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A state handle that contains stream state in a byte array.
+ */
+public final class ByteStreamStateHandle implements StreamStateHandle {
+
+ private static final long serialVersionUID = -5280226231200217594L;
+
+ /** the state data */
+ private final byte[] data;
+
+ /**
+ * Creates a new ByteStreamStateHandle containing the given data.
+ *
+ * @param data The state data.
+ */
+ public ByteStreamStateHandle(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public InputStream getState(ClassLoader userCodeClassLoader) {
+ return new ByteArrayInputStream(data);
+ }
+
+ @Override
+ public void discardState() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
new file mode 100644
index 0000000..96cb440
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.runtime.state.AbstractHeapKvState;
+
+import java.util.HashMap;
+
+/**
+ * Heap-backed key/value state that is snapshotted into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> {
+
+ public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
+ super(keySerializer, valueSerializer, defaultValue);
+ }
+
+ public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+ V defaultValue, HashMap<K, V> state) {
+ super(keySerializer, valueSerializer, defaultValue, state);
+ }
+
+ @Override
+ public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception {
+ DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
+ writeStateToOutputView(ser);
+ byte[] bytes = ser.getCopyOfBuffer();
+
+ return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
new file mode 100644
index 0000000..1b03def
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.util.HashMap;
+
+/**
+ * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte
+ * array, in serialized form.
+ *
+ * @param <K> The type of the key in the snapshot state.
+ * @param <V> The type of the value in the snapshot state.
+ */
+public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, MemoryStateBackend> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Name of the key serializer class */
+ private final String keySerializerClassName;
+
+ /** Name of the value serializer class */
+ private final String valueSerializerClassName;
+
+ /** The serialized data of the state key/value pairs */
+ private final byte[] data;
+
+ /** The number of key/value pairs */
+ private final int numEntries;
+
+ /**
+ * Creates a new heap memory state snapshot.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param data The serialized data of the state key/value pairs
+ * @param numEntries The number of key/value pairs
+ */
+ public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer, byte[] data, int numEntries) {
+ this.keySerializerClassName = keySerializer.getClass().getName();
+ this.valueSerializerClassName = valueSerializer.getClass().getName();
+ this.data = data;
+ this.numEntries = numEntries;
+ }
+
+
+ @Override
+ public MemHeapKvState<K, V> restoreState(
+ MemoryStateBackend stateBackend,
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<V> valueSerializer,
+ V defaultValue,
+ ClassLoader classLoader) throws Exception {
+
+ // validity checks
+ if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
+ !valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
+ throw new IllegalArgumentException(
+ "Cannot restore the state from the snapshot with the given serializers. " +
+ "State (K/V) was serialized with (" + valueSerializerClassName +
+ "/" + keySerializerClassName + ")");
+ }
+
+ // restore state
+ HashMap<K, V> stateMap = new HashMap<>(numEntries);
+ DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length);
+
+ for (int i = 0; i < numEntries; i++) {
+ K key = keySerializer.deserialize(in);
+ V value = valueSerializer.deserialize(in);
+ stateMap.put(key, value);
+ }
+
+ return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap);
+ }
+
+ /**
+ * Discarding the heap state is a no-op.
+ */
+ @Override
+ public void discardState() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
new file mode 100644
index 0000000..8d297d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A {@link StateBackend} that stores all its data and checkpoints in memory and has no
+ * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
+ * transferred
+ */
+public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
+
+ private static final long serialVersionUID = 4109305377809414635L;
+
+ /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
+ private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+
+ /** The maximal size that the snapshotted memory state may have */
+ private final int maxStateSize;
+
+ /**
+ * Creates a new memory state backend that accepts states whose serialized forms are
+ * up to the default state size (5 MB).
+ */
+ public MemoryStateBackend() {
+ this(DEFAULT_MAX_STATE_SIZE);
+ }
+
+ /**
+ * Creates a new memory state backend that accepts states whose serialized forms are
+ * up to the given number of bytes.
+ *
+ * @param maxStateSize The maximal size of the serialized state
+ */
+ public MemoryStateBackend(int maxStateSize) {
+ this.maxStateSize = maxStateSize;
+ }
+
+ // ------------------------------------------------------------------------
+ // initialization and cleanup
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeForJob(JobID job) {
+ // nothing to do here
+ }
+
+ @Override
+ public void disposeAllStateForCurrentJob() {
+ // nothing to do here, GC will do it
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ // ------------------------------------------------------------------------
+ // State backend operations
+ // ------------------------------------------------------------------------
+
+ @Override
+ public <K, V> MemHeapKvState<K, V> createKvState(
+ TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
+ return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
+ }
+
+ /**
+ * Serialized the given state into bytes using Java serialization and creates a state handle that
+ * can re-create that state.
+ *
+ * @param state The state to checkpoint.
+ * @param checkpointID The ID of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ * @param <S> The type of the state.
+ *
+ * @return A state handle that contains the given state serialized as bytes.
+ * @throws Exception Thrown, if the serialization fails.
+ */
+ @Override
+ public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+ S state, long checkpointID, long timestamp) throws Exception
+ {
+ SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
+ checkSize(handle.getSizeOfSerializedState(), maxStateSize);
+ return new SerializedStateHandle<S>(state);
+ }
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(
+ long checkpointID, long timestamp) throws Exception
+ {
+ return new MemoryCheckpointOutputStream(maxStateSize);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
+ }
+
+ static void checkSize(int size, int maxSize) throws IOException {
+ if (size > maxSize) {
+ throw new IOException(
+ "Size of the state is larger than the maximum permitted memory-backed state. Size="
+ + size + " , maxSize=" + maxSize
+ + " . Consider using a different state backend, like the File System State backend.");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A CheckpointStateOutputStream that writes into a byte array.
+ */
+ public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
+
+ private final ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+ private final int maxSize;
+
+ private boolean closed;
+
+ public MemoryCheckpointOutputStream(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public void write(int b) {
+ os.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ os.write(b, off, len);
+ }
+
+ // --------------------------------------------------------------------
+
+ @Override
+ public void close() {
+ closed = true;
+ os.reset();
+ }
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ return new ByteStreamStateHandle(closeAndGetBytes());
+ }
+
+ /**
+ * Closes the stream and returns the byte array containing the stream's data.
+ * @return The byte array containing the stream's data.
+ * @throws IOException Thrown if the size of the data exceeds the maximal
+ */
+ public byte[] closeAndGetBytes() throws IOException {
+ if (!closed) {
+ checkSize(os.size(), maxSize);
+ byte[] bytes = os.toByteArray();
+ close();
+ return bytes;
+ }
+ else {
+ throw new IllegalStateException("stream has already been closed");
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Static default instance
+ // ------------------------------------------------------------------------
+
+ /** The default instance of this state backend, using the default maximal state size */
+ private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
+
+ /**
+ * Gets the default instance of this state backend, using the default maximal state size.
+ * @return The default instance of this state backend.
+ */
+ public static MemoryStateBackend defaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
new file mode 100644
index 0000000..c488dc9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+
+/**
+ * A state handle that represents its state in serialized form as bytes.
+ *
+ * @param <T> The type of state represented by this state handle.
+ */
+public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> {
+
+ private static final long serialVersionUID = 4145685722538475769L;
+
+ public SerializedStateHandle(T value) throws IOException {
+ super(value);
+ }
+
+ @Override
+ public T getState(ClassLoader classLoader) throws Exception {
+ return deserializeValue(classLoader);
+ }
+
+ /**
+ * Discarding heap-memory backed state is a no-op, so this method does nothing.
+ */
+ @Override
+ public void discardState() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 79b9b7e..a32fc65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,11 +34,14 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.state.StateHandleProviderFactory;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.Serializable;
+
import static com.google.common.base.Preconditions.checkNotNull;
public class ZooKeeperUtils {
@@ -170,7 +173,7 @@ public class ZooKeeperUtils {
String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
- ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+ ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
@@ -188,8 +191,7 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration");
- StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
- StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+ StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = configuration.getString(
@@ -197,7 +199,7 @@ public class ZooKeeperUtils {
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
return new ZooKeeperSubmittedJobGraphStore(
- client, zooKeeperSubmittedJobsPath, stateHandleProvider);
+ client, zooKeeperSubmittedJobsPath, stateStorage);
}
/**
@@ -219,21 +221,23 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration");
- StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
- StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+ String checkpointsPath = configuration.getString(
+ ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
- String completedCheckpointsPath = configuration.getString(
- ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
- ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
- completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+ StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
+ configuration,
+ "completedCheckpoint");
+
+ checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
return new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
userClassLoader,
client,
- completedCheckpointsPath,
- stateHandleProvider);
+ checkpointsPath,
+ stateStorage);
}
/**
@@ -259,6 +263,30 @@ public class ZooKeeperUtils {
}
/**
+ * Creates a {@link FileSystemStateStorageHelper} instance.
+ *
+ * @param configuration {@link Configuration} object
+ * @param prefix Prefix for the created files
+ * @param <T> Type of the state objects
+ * @return {@link FileSystemStateStorageHelper} instance
+ * @throws IOException
+ */
+ private static <T extends Serializable> FileSystemStateStorageHelper<T> createFileSystemStateStorage(
+ Configuration configuration,
+ String prefix) throws IOException {
+
+ String rootPath = configuration.getString(
+ ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+ if (rootPath.equals("")) {
+ throw new IllegalConfigurationException("Missing recovery path. Specify via " +
+ "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+ } else {
+ return new FileSystemStateStorageHelper<T>(rootPath, prefix);
+ }
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private ZooKeeperUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
new file mode 100644
index 0000000..d18cace
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.io.Serializable;
+
+/**
+ * State storage helper which is used by {@ZooKeeperStateHandleStore} to persiste state before
+ * the state handle is written to ZooKeeper.
+ *
+ * @param <T>
+ */
+public interface StateStorageHelper<T extends Serializable> {
+
+ /**
+ * Stores the given state and returns a state handle to it.
+ *
+ * @param state State to be stored
+ * @return State handle to the stored state
+ * @throws Exception
+ */
+ StateHandle<T> store(T state) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 936fe1b..6073a39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -23,12 +23,14 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.util.InstantiationUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -65,11 +67,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class ZooKeeperStateHandleStore<T extends Serializable> {
+ public static Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
+
/** Curator ZooKeeper client */
private final CuratorFramework client;
- /** State handle provider */
- private final StateHandleProvider<T> stateHandleProvider;
+ private final StateStorageHelper<T> storage;
/**
* Creates a {@link ZooKeeperStateHandleStore}.
@@ -78,14 +81,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* expected that the client's namespace ensures that the root
* path is exclusive for all state handles managed by this
* instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
- * @param stateHandleProvider The state handle provider for the state
*/
public ZooKeeperStateHandleStore(
- CuratorFramework client,
- StateHandleProvider<T> stateHandleProvider) {
+ CuratorFramework client,
+ StateStorageHelper storage) throws IOException {
this.client = checkNotNull(client, "Curator client");
- this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider");
+ this.storage = checkNotNull(storage, "State storage");
}
/**
@@ -112,12 +114,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
* @return Created {@link StateHandle}
* @throws Exception If a ZooKeeper or state handle operation fails
*/
- public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception {
+ public StateHandle<T> add(
+ String pathInZooKeeper,
+ T state,
+ CreateMode createMode) throws Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
- // Create the state handle. Nothing persisted yet.
- StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+ StateHandle<T> stateHandle = storage.store(state);
boolean success = false;
@@ -159,7 +163,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
StateHandle<T> oldStateHandle = get(pathInZooKeeper);
- StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+ StateHandle<T> stateHandle = storage.store(state);
boolean success = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
new file mode 100644
index 0000000..d6b69e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper.filesystem;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
+import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * {@link StateStorageHelper} implementation which stores the state in the given filesystem path.
+ *
+ * @param <T>
+ */
+public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> {
+
+ private final Path rootPath;
+
+ private final String prefix;
+
+ private final FileSystem fs;
+
+ public FileSystemStateStorageHelper(String rootPath, String prefix) throws IOException {
+ this(new Path(rootPath), prefix);
+ }
+
+ public FileSystemStateStorageHelper(Path rootPath, String prefix) throws IOException {
+ this.rootPath = Preconditions.checkNotNull(rootPath, "Root path");
+ this.prefix = Preconditions.checkNotNull(prefix, "Prefix");
+
+ fs = FileSystem.get(rootPath.toUri());
+ }
+
+ @Override
+ public StateHandle<T> store(T state) throws Exception {
+ Exception latestException = null;
+
+ for (int attempt = 0; attempt < 10; attempt++) {
+ Path filePath = getNewFilePath();
+ FSDataOutputStream outStream;
+ try {
+ outStream = fs.create(filePath, false);
+ }
+ catch (Exception e) {
+ latestException = e;
+ continue;
+ }
+
+ try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
+ os.writeObject(state);
+ }
+
+ return new FileSerializableStateHandle<>(filePath);
+ }
+
+ throw new Exception("Could not open output stream for state backend", latestException);
+ }
+
+ private Path getNewFilePath() {
+ return new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ebc0ea9..d9b69ad 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1542,30 +1542,25 @@ object JobManager {
}
}
- val webMonitor: Option[WebMonitor] =
- if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
- val address = AkkaUtils.getAddress(jobManagerSystem)
+ val address = AkkaUtils.getAddress(jobManagerSystem)
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
+ configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
- // start the job manager web frontend
- if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
- val leaderRetrievalService = LeaderRetrievalUtils
- .createLeaderRetrievalService(configuration)
+ val webMonitor: Option[WebMonitor] =
+ if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+ LOG.info("Starting JobManger web frontend")
+ val leaderRetrievalService = LeaderRetrievalUtils
+ .createLeaderRetrievalService(configuration)
- LOG.info("Starting NEW JobManger web frontend")
- // start the new web frontend. we need to load this dynamically
- // because it is not in the same project/dependencies
- Some(startWebRuntimeMonitor(configuration, leaderRetrievalService, jobManagerSystem))
- }
- else {
- LOG.info("Starting JobManger web frontend")
+ // start the web frontend. we need to load this dynamically
+ // because it is not in the same project/dependencies
+ val webServer = WebMonitorUtils.startWebRuntimeMonitor(
+ configuration,
+ leaderRetrievalService,
+ jobManagerSystem)
- // The old web frontend does not work with recovery mode
- val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(configuration)
- Some(new WebInfoServer(configuration, leaderRetrievalService, jobManagerSystem))
- }
+ Option(webServer)
}
else {
None
@@ -1624,16 +1619,8 @@ object JobManager {
monitor =>
val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
monitor.start(jobManagerAkkaUrl)
- LOG.info("Starting JobManger web frontend")
- // start the web frontend. we need to load this dynamically
- // because it is not in the same project/dependencies
- val webServer = WebMonitorUtils.startWebRuntimeMonitor(
- configuration,
- leaderRetrievalService,
- jobManagerSystem)
}
-
(jobManagerSystem, jobManager, archive, webMonitor)
}
catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 4c6ddfd..dc6f550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.junit.AfterClass;
import org.junit.Before;
@@ -56,8 +58,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
ClassLoader userLoader) throws Exception {
return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
- ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle
- .LocalStateHandleProvider<CompletedCheckpoint>());
+ ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() {
+ @Override
+ public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
+ return new LocalStateHandle<>(state);
+ }
+ });
}
// ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 4df8afb..ea4195c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.execution.librarycache;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -28,9 +27,9 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
@@ -46,23 +45,8 @@ import static org.junit.Assert.assertEquals;
public class BlobLibraryCacheRecoveryITCase {
- private File recoveryDir;
-
- @Before
- public void setUp() throws Exception {
- recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
- if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
- throw new IllegalStateException("Failed to create temp directory for test");
- }
- }
-
- @After
- public void cleanUp() throws Exception {
- if (recoveryDir != null) {
- FileUtils.deleteDirectory(recoveryDir);
- }
- }
-
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobLibraryCacheManager.
@@ -81,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+ config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
@@ -170,7 +154,7 @@ public class BlobLibraryCacheRecoveryITCase {
}
// Verify everything is clean
- File[] recoveryFiles = recoveryDir.listFiles();
+ File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 861a713..356ba36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -24,7 +24,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
-import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -54,8 +56,13 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
- private final static LocalStateHandleProvider<SubmittedJobGraph> StateHandleProvider =
- new LocalStateHandleProvider<>();
+ private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
+ @Override
+ public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception {
+ return new LocalStateHandle<>(state);
+ }
+ };
+
@AfterClass
public static void tearDown() throws Exception {
@@ -72,8 +79,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test
public void testPutAndRemoveJobGraph() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testPutAndRemoveJobGraph",
- StateHandleProvider);
+ ZooKeeper.createClient(),
+ "/testPutAndRemoveJobGraph",
+ localStateStorage);
try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -125,7 +133,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test
public void testRecoverJobGraphs() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testRecoverJobGraphs", StateHandleProvider);
+ ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -175,10 +183,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
try {
jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+ ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider);
+ ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -234,10 +242,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+ ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider);
+ ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
jobGraphs.start(null);
otherJobGraphs.start(null);