You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/11/24 10:45:07 UTC
[5/8] flink git commit: [FLINK-2916] [streaming] Expose operator and
task information to StateBackend
[FLINK-2916] [streaming] Expose operator and task information to StateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad6f8265
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad6f8265
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad6f8265
Branch: refs/heads/master
Commit: ad6f826584be7527c58e2126e2828f82afc97875
Parents: 8cabe67
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Oct 26 09:58:49 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/state/StateBackend.java | 62 +++----
.../state/filesystem/FsStateBackend.java | 89 +++++-----
.../state/memory/MemoryStateBackend.java | 40 ++---
.../operators/testutils/DummyEnvironment.java | 167 +++++++++++++++++++
.../runtime/state/FileStateBackendTest.java | 109 ++++++------
.../runtime/state/MemoryStateBackendTest.java | 46 ++---
.../flink/hdfstests/FileStateBackendTest.java | 11 +-
.../api/operators/AbstractStreamOperator.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 2 +-
.../PartitionedStateCheckpointingITCase.java | 21 +--
10 files changed, 362 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index f8b1cfd..6f72bce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,32 +31,32 @@ import java.io.Serializable;
/**
* A state backend defines how state is stored and snapshotted during checkpoints.
- *
+ *
* @param <Backend> The type of backend itself. This generic parameter is used to refer to the
* type of backend when creating state backed by this backend.
*/
public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
-
+
private static final long serialVersionUID = 4620413814639220247L;
-
+
// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------
-
+
/**
* This method is called by the task upon deployment to initialize the state backend for
* data for a specific job.
- *
- * @param job The ID of the job for which the state backend instance checkpoints data.
+ *
+ * @param The {@link Environment} of the task that instantiated the state backend
* @throws Exception Overwritten versions of this method may throw exceptions, in which
* case the job that uses the state backend is considered failed during
* deployment.
*/
- public abstract void initializeForJob(JobID job) throws Exception;
+ public abstract void initializeForJob(Environment env) throws Exception;
/**
* Disposes all state associated with the current job.
- *
+ *
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;
@@ -64,33 +64,35 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
/**
* Closes the state backend, releasing all internal resources, but does not delete any persistent
* checkpoint data.
- *
+ *
* @throws Exception Exceptions can be forwarded and will be logged by the system
*/
public abstract void close() throws Exception;
-
+
// ------------------------------------------------------------------------
// key/value state
// ------------------------------------------------------------------------
/**
* Creates a key/value state backed by this state backend.
- *
+ *
+ * @param operatorId Unique id for the operator creating the state
+ * @param stateName Name of the created state
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
- *
+ *
* @return A new key/value state backed by this backend.
- *
+ *
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
- public abstract <K, V> KvState<K, V, Backend> createKvState(
+ public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
V defaultValue) throws Exception;
-
-
+
+
// ------------------------------------------------------------------------
// storing state for a checkpoint
// ------------------------------------------------------------------------
@@ -98,16 +100,16 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
/**
* Creates an output stream that writes into the state of the given checkpoint. When the stream
* is closes, it returns a state handle that can retrieve the state back.
- *
+ *
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An output stream that writes state for the given checkpoint.
- *
+ *
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;
-
+
/**
* Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
* When the stream is closes, it returns a state handle that can retrieve the state back.
@@ -125,20 +127,20 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
- *
+ *
* @param state The state to be checkpointed.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
- *
+ *
* @return A state handle that can retrieve the checkpoined state.
- *
+ *
* @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
*/
public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
S state, long checkpointID, long timestamp) throws Exception;
-
-
+
+
// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
@@ -151,7 +153,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
/**
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
- *
+ *
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
@@ -162,9 +164,9 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
* A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
*/
public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-
+
private final CheckpointStateOutputStream out;
-
+
public CheckpointStateOutputView(CheckpointStateOutputStream out) {
super(out);
this.out = out;
@@ -193,7 +195,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
private static final class DataInputViewHandle implements StateHandle<DataInputView> {
private static final long serialVersionUID = 2891559813513532079L;
-
+
private final StreamStateHandle stream;
private DataInputViewHandle(StreamStateHandle stream) {
@@ -202,7 +204,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
- return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
+ return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index d7b392c..6a94a80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.StateBackend;
-
+import org.apache.flink.runtime.state.StateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,27 +37,27 @@ import java.util.UUID;
/**
* The file state backend is a state backend that stores the state of streaming jobs in a file system.
- *
+ *
* <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
* directory, it creates a directory per job, inside which each checkpoint gets a directory, with
* files for each state, for example:
- *
+ *
* {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
*/
public class FsStateBackend extends StateBackend<FsStateBackend> {
private static final long serialVersionUID = -8191916350224044011L;
-
+
private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class);
-
-
+
+
/** The path to the directory for the checkpoint data, including the file system
* description via scheme and optional authority */
private final Path basePath;
-
+
/** The directory (job specific) into this initialized instance of the backend stores its data */
private transient Path checkpointDirectory;
-
+
/** Cached handle to the file system for file operations */
private transient FileSystem filesystem;
@@ -104,14 +103,14 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
- *
+ *
* <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
* must be accessible via {@link FileSystem#get(URI)}.
- *
+ *
* <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
* (host and port), or that the Hadoop configuration that describes that information must be in the
* classpath.
- *
+ *
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to teh checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
@@ -119,7 +118,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
public FsStateBackend(URI checkpointDataUri) throws IOException {
final String scheme = checkpointDataUri.getScheme();
final String path = checkpointDataUri.getPath();
-
+
// some validity checks
if (scheme == null) {
throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
@@ -132,12 +131,12 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
if (path.length() == 0 || path.equals("/")) {
throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
}
-
+
// we do a bit of work to make sure that the URI for the filesystem refers to exactly the same
// (distributed) filesystem on all hosts and includes full host/port information, even if the
// original URI did not include that. We count on the filesystem loading from the configuration
// to fill in the missing data.
-
+
// try to grab the file system for this path/URI
this.filesystem = FileSystem.get(checkpointDataUri);
if (this.filesystem == null) {
@@ -151,7 +150,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
}
catch (URISyntaxException e) {
throw new IOException(
- String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s",
+ String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s",
checkpointDataUri, fsURI), e);
}
}
@@ -159,7 +158,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
/**
* Gets the base directory where all state-containing files are stored.
* The job specific directory is created inside this directory.
- *
+ *
* @return The base directory.
*/
public Path getBasePath() {
@@ -169,7 +168,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
/**
* Gets the directory where this state backend stores its checkpoint data. Will be null if
* the state backend has not been initialized.
- *
+ *
* @return The directory where this state backend stores its checkpoint data.
*/
public Path getCheckpointDirectory() {
@@ -179,16 +178,16 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
/**
* Checks whether this state backend is initialized. Note that initialization does not carry
* across serialization. After each serialization, the state backend needs to be initialized.
- *
+ *
* @return True, if the file state backend has been initialized, false otherwise.
*/
public boolean isInitialized() {
- return filesystem != null && checkpointDirectory != null;
+ return filesystem != null && checkpointDirectory != null;
}
/**
* Gets the file system handle for the file system that stores the state for this backend.
- *
+ *
* @return This backend's file system handle.
*/
public FileSystem getFileSystem() {
@@ -203,13 +202,13 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------
-
+
@Override
- public void initializeForJob(JobID jobId) throws Exception {
- Path dir = new Path(basePath, jobId.toString());
-
+ public void initializeForJob(Environment env) throws Exception {
+ Path dir = new Path(basePath, env.getJobID().toString());
+
LOG.info("Initializing file state backend to URI " + dir);
-
+
filesystem = basePath.getFileSystem();
filesystem.mkdirs(dir);
@@ -220,7 +219,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
public void disposeAllStateForCurrentJob() throws Exception {
FileSystem fs = this.filesystem;
Path dir = this.checkpointDirectory;
-
+
if (fs != null && dir != null) {
this.filesystem = null;
this.checkpointDirectory = null;
@@ -237,9 +236,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
// ------------------------------------------------------------------------
// state backend operations
// ------------------------------------------------------------------------
-
+
@Override
- public <K, V> FsHeapKvState<K, V> createKvState(
+ public <K, V> FsHeapKvState<K, V> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
}
@@ -254,7 +253,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
final Path checkpointDir = createCheckpointDirPath(checkpointID);
filesystem.mkdirs(checkpointDir);
-
+
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
@@ -273,19 +272,19 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
}
return new FileSerializableStateHandle<S>(targetPath);
}
-
+
throw new Exception("Could not open output stream for state backend", latestException);
}
-
+
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
checkFileSystemInitialized();
-
+
final Path checkpointDir = createCheckpointDirPath(checkpointID);
filesystem.mkdirs(checkpointDir);
-
+
Exception latestException = null;
-
+
for (int attempt = 0; attempt < 10; attempt++) {
Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString());
try {
@@ -298,7 +297,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
}
throw new Exception("Could not open output stream for state backend", latestException);
}
-
+
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
@@ -308,18 +307,18 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
}
}
-
+
private Path createCheckpointDirPath(long checkpointID) {
return new Path(checkpointDirectory, "chk-" + checkpointID);
}
-
+
@Override
public String toString() {
return checkpointDirectory == null ?
- "File State Backend @ " + basePath :
+ "File State Backend @ " + basePath :
"File State Backend (initialized) @ " + checkpointDirectory;
}
-
+
// ------------------------------------------------------------------------
// Output stream for state checkpointing
// ------------------------------------------------------------------------
@@ -331,11 +330,11 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
private final FSDataOutputStream outStream;
-
+
private final Path filePath;
-
+
private final FileSystem fs;
-
+
private boolean closed;
FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) {
@@ -373,7 +372,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
try {
outStream.close();
fs.delete(filePath, false);
-
+
// attempt to delete the parent (will fail and be ignored if the parent has more files)
try {
fs.delete(filePath.getParent(), false);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 8d297d4..f3e7552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.state.memory;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.ByteArrayOutputStream;
@@ -31,15 +31,15 @@ import java.io.Serializable;
/**
* A {@link StateBackend} that stores all its data and checkpoints in memory and has no
* capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred
+ * transferred
*/
public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
private static final long serialVersionUID = 4109305377809414635L;
-
+
/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
-
+
/** The maximal size that the snapshotted memory state may have */
private final int maxStateSize;
@@ -54,7 +54,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
/**
* Creates a new memory state backend that accepts states whose serialized forms are
* up to the given number of bytes.
- *
+ *
* @param maxStateSize The maximal size of the serialized state
*/
public MemoryStateBackend(int maxStateSize) {
@@ -66,7 +66,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
// ------------------------------------------------------------------------
@Override
- public void initializeForJob(JobID job) {
+ public void initializeForJob(Environment env) {
// nothing to do here
}
@@ -81,22 +81,22 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
// ------------------------------------------------------------------------
// State backend operations
// ------------------------------------------------------------------------
-
+
@Override
- public <K, V> MemHeapKvState<K, V> createKvState(
+ public <K, V> MemHeapKvState<K, V> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
}
-
+
/**
* Serialized the given state into bytes using Java serialization and creates a state handle that
* can re-create that state.
- *
+ *
* @param state The state to checkpoint.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
- *
+ *
* @return A state handle that contains the given state serialized as bytes.
* @throws Exception Thrown, if the serialization fails.
*/
@@ -119,7 +119,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
@@ -133,18 +133,18 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
+ " . Consider using a different state backend, like the File System State backend.");
}
}
-
+
// ------------------------------------------------------------------------
/**
* A CheckpointStateOutputStream that writes into a byte array.
*/
public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
-
+
private final ByteArrayOutputStream os = new ByteArrayOutputStream();
-
+
private final int maxSize;
-
+
private boolean closed;
public MemoryCheckpointOutputStream(int maxSize) {
@@ -177,7 +177,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
/**
* Closes the stream and returns the byte array containing the stream's data.
* @return The byte array containing the stream's data.
- * @throws IOException Thrown if the size of the data exceeds the maximal
+ * @throws IOException Thrown if the size of the data exceeds the maximal
*/
public byte[] closeAndGetBytes() throws IOException {
if (!closed) {
@@ -191,11 +191,11 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
}
}
}
-
+
// ------------------------------------------------------------------------
// Static default instance
// ------------------------------------------------------------------------
-
+
/** The default instance of this state backend, using the default maximal state size */
private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend();
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
new file mode 100644
index 0000000..71bec4a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+public class DummyEnvironment implements Environment {
+
+ private final String taskName;
+ private final int numSubTasks;
+ private final int subTaskIndex;
+ private final JobID jobId = new JobID();
+ private final JobVertexID jobVertexId = new JobVertexID();
+
+ public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
+ this.taskName = taskName;
+ this.numSubTasks = numSubTasks;
+ this.subTaskIndex = subTaskIndex;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ @Override
+ public JobVertexID getJobVertexId() {
+ return jobVertexId;
+ }
+
+ @Override
+ public ExecutionAttemptID getExecutionId() {
+ return null;
+ }
+
+ @Override
+ public Configuration getTaskConfiguration() {
+ return null;
+ }
+
+ @Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return null;
+ }
+
+ @Override
+ public Configuration getJobConfiguration() {
+ return null;
+ }
+
+ @Override
+ public int getNumberOfSubtasks() {
+ return numSubTasks;
+ }
+
+ @Override
+ public int getIndexInSubtaskGroup() {
+ return subTaskIndex;
+ }
+
+ @Override
+ public InputSplitProvider getInputSplitProvider() {
+ return null;
+ }
+
+ @Override
+ public IOManager getIOManager() {
+ return null;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return null;
+ }
+
+ @Override
+ public String getTaskName() {
+ return taskName;
+ }
+
+ @Override
+ public String getTaskNameWithSubtasks() {
+ return taskName;
+ }
+
+ @Override
+ public ClassLoader getUserClassLoader() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Future<Path>> getDistributedCacheEntries() {
+ return null;
+ }
+
+ @Override
+ public BroadcastVariableManager getBroadcastVariableManager() {
+ return null;
+ }
+
+ @Override
+ public AccumulatorRegistry getAccumulatorRegistry() {
+ return null;
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId) {
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+ }
+
+ @Override
+ public ResultPartitionWriter getWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public ResultPartitionWriter[] getAllWriters() {
+ return null;
+ }
+
+ @Override
+ public InputGate getInputGate(int index) {
+ return null;
+ }
+
+ @Override
+ public InputGate[] getAllInputGates() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index a6cfae3..fc5d8c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -18,8 +18,22 @@
package org.apache.flink.runtime.state;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,41 +43,34 @@ import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
public class FileStateBackendTest {
-
+
@Test
public void testSetupAndSerialization() {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
final String backendDir = localFileUri(tempDir);
FsStateBackend originalBackend = new FsStateBackend(backendDir);
-
+
assertFalse(originalBackend.isInitialized());
assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
assertNull(originalBackend.getCheckpointDirectory());
-
+
// serialize / copy the backend
FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
assertFalse(backend.isInitialized());
assertEquals(new URI(backendDir), backend.getBasePath().toUri());
assertNull(backend.getCheckpointDirectory());
-
+
// no file operations should be possible right now
try {
backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
@@ -71,17 +78,17 @@ public class FileStateBackendTest {
} catch (IllegalStateException e) {
// supreme!
}
-
- backend.initializeForJob(new JobID());
+
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
assertNotNull(backend.getCheckpointDirectory());
-
+
File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
assertTrue(checkpointDir.exists());
assertTrue(isDirectoryEmpty(checkpointDir));
-
+
backend.disposeAllStateForCurrentJob();
assertNull(backend.getCheckpointDirectory());
-
+
assertTrue(isDirectoryEmpty(tempDir));
}
catch (Exception e) {
@@ -92,20 +99,20 @@ public class FileStateBackendTest {
deleteDirectorySilently(tempDir);
}
}
-
+
@Test
public void testSerializableState() {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
String state1 = "dummy state";
String state2 = "row row row your boat";
Integer state3 = 42;
-
+
StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
@@ -113,15 +120,15 @@ public class FileStateBackendTest {
assertFalse(isDirectoryEmpty(checkpointDir));
assertEquals(state1, handle1.getState(getClass().getClassLoader()));
handle1.discardState();
-
+
assertFalse(isDirectoryEmpty(checkpointDir));
assertEquals(state2, handle2.getState(getClass().getClassLoader()));
handle2.discardState();
-
+
assertFalse(isDirectoryEmpty(checkpointDir));
assertEquals(state3, handle3.getState(getClass().getClassLoader()));
handle3.discardState();
-
+
assertTrue(isDirectoryEmpty(checkpointDir));
}
catch (Exception e) {
@@ -138,7 +145,7 @@ public class FileStateBackendTest {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
@@ -146,7 +153,7 @@ public class FileStateBackendTest {
byte[] state2 = new byte[1];
byte[] state3 = new byte[0];
byte[] state4 = new byte[177];
-
+
Random rnd = new Random();
rnd.nextBytes(state1);
rnd.nextBytes(state2);
@@ -155,21 +162,21 @@ public class FileStateBackendTest {
long checkpointId = 97231523452L;
- FsStateBackend.FsCheckpointStateOutputStream stream1 =
+ FsStateBackend.FsCheckpointStateOutputStream stream1 =
backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
FsStateBackend.FsCheckpointStateOutputStream stream2 =
backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
FsStateBackend.FsCheckpointStateOutputStream stream3 =
backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-
+
stream1.write(state1);
stream2.write(state2);
stream3.write(state3);
-
+
FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
-
+
// use with try-with-resources
StreamStateHandle handle4;
try (StateBackend.CheckpointStateOutputStream stream4 =
@@ -177,7 +184,7 @@ public class FileStateBackendTest {
stream4.write(state4);
handle4 = stream4.closeAndGetHandle();
}
-
+
// close before accessing handle
StateBackend.CheckpointStateOutputStream stream5 =
backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
@@ -189,22 +196,22 @@ public class FileStateBackendTest {
} catch (IOException e) {
// uh-huh
}
-
+
validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
handle1.discardState();
assertFalse(isDirectoryEmpty(checkpointDir));
ensureLocalFileDeleted(handle1.getFilePath());
-
+
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
handle2.discardState();
assertFalse(isDirectoryEmpty(checkpointDir));
ensureLocalFileDeleted(handle2.getFilePath());
-
+
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
handle3.discardState();
assertFalse(isDirectoryEmpty(checkpointDir));
ensureLocalFileDeleted(handle3.getFilePath());
-
+
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
handle4.discardState();
assertTrue(isDirectoryEmpty(checkpointDir));
@@ -223,12 +230,12 @@ public class FileStateBackendTest {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
KvState<Integer, String, FsStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+ backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
assertEquals(0, kv.size());
@@ -312,12 +319,12 @@ public class FileStateBackendTest {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
-
+
KvState<Integer, String, FsStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+ backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
kv.setCurrentKey(1);
kv.update("1");
@@ -365,7 +372,7 @@ public class FileStateBackendTest {
} catch (Exception e) {
fail("wrong exception");
}
-
+
snapshot.discardState();
assertTrue(isDirectoryEmpty(checkpointDir));
@@ -384,10 +391,10 @@ public class FileStateBackendTest {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
- backend.initializeForJob(new JobID());
-
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
+
KvState<Integer, IntValue, FsStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+ backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
kv.setCurrentKey(1);
IntValue default1 = kv.value();
@@ -408,11 +415,11 @@ public class FileStateBackendTest {
deleteDirectorySilently(tempDir);
}
}
-
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
+
private static void ensureLocalFileDeleted(Path path) {
URI uri = path.toUri();
if ("file".equals(uri.getScheme())) {
@@ -423,23 +430,23 @@ public class FileStateBackendTest {
throw new IllegalArgumentException("not a local path");
}
}
-
+
private static void deleteDirectorySilently(File dir) {
try {
FileUtils.deleteDirectory(dir);
}
catch (IOException ignored) {}
}
-
+
private static boolean isDirectoryEmpty(File directory) {
String[] nested = directory.list();
return nested == null || nested.length == 0;
}
-
+
private static String localFileUri(File path) {
return path.toURI().toString();
}
-
+
private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
byte[] holder = new byte[data.length];
assertEquals("not enough data", holder.length, is.read(holder));
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f6d1bb5..ae027e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.*;
* Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
*/
public class MemoryStateBackendTest {
-
+
@Test
public void testSerializableState() {
try {
@@ -49,10 +49,10 @@ public class MemoryStateBackendTest {
HashMap<String, Integer> state = new HashMap<>();
state.put("hey there", 2);
state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
-
+
StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
assertNotNull(handle);
-
+
HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
assertEquals(state, restored);
}
@@ -99,7 +99,7 @@ public class MemoryStateBackendTest {
oos.writeObject(state);
oos.flush();
StreamStateHandle handle = os.closeAndGetHandle();
-
+
assertNotNull(handle);
ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
@@ -124,7 +124,7 @@ public class MemoryStateBackendTest {
StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
ObjectOutputStream oos = new ObjectOutputStream(os);
-
+
try {
oos.writeObject(state);
oos.flush();
@@ -140,17 +140,17 @@ public class MemoryStateBackendTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testKeyValueState() {
try {
MemoryStateBackend backend = new MemoryStateBackend();
-
- KvState<Integer, String, MemoryStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
+
+ KvState<Integer, String, MemoryStateBackend> kv =
+ backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
assertEquals(0, kv.size());
-
+
// some modifications to the state
kv.setCurrentKey(1);
assertNull(kv.value());
@@ -163,7 +163,7 @@ public class MemoryStateBackendTest {
kv.setCurrentKey(1);
assertEquals("1", kv.value());
assertEquals(2, kv.size());
-
+
// draw a snapshot
KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 =
kv.snapshot(682375462378L, System.currentTimeMillis());
@@ -188,9 +188,9 @@ public class MemoryStateBackendTest {
assertEquals("u2", kv.value());
kv.setCurrentKey(3);
assertEquals("u3", kv.value());
-
+
// restore the first snapshot and validate it
- KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
+ KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
assertEquals(2, restored1.size());
@@ -216,29 +216,29 @@ public class MemoryStateBackendTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testRestoreWithWrongSerializers() {
try {
MemoryStateBackend backend = new MemoryStateBackend();
KvState<Integer, String, MemoryStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
-
+ backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
kv.setCurrentKey(1);
kv.update("1");
kv.setCurrentKey(2);
kv.update("2");
-
+
KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
kv.snapshot(682375462378L, System.currentTimeMillis());
@SuppressWarnings("unchecked")
- TypeSerializer<Integer> fakeIntSerializer =
+ TypeSerializer<Integer> fakeIntSerializer =
(TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
@SuppressWarnings("unchecked")
- TypeSerializer<String> fakeStringSerializer =
+ TypeSerializer<String> fakeStringSerializer =
(TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
try {
@@ -276,20 +276,20 @@ public class MemoryStateBackendTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testCopyDefaultValue() {
try {
MemoryStateBackend backend = new MemoryStateBackend();
KvState<Integer, IntValue, MemoryStateBackend> kv =
- backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+ backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
kv.setCurrentKey(1);
IntValue default1 = kv.value();
kv.setCurrentKey(2);
IntValue default2 = kv.value();
-
+
assertNotNull(default1);
assertNotNull(default2);
assertEquals(default1, default2);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 4e4acd2..4fb6820 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.hdfstests;
import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
@@ -29,7 +28,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.hadoop.conf.Configuration;
@@ -63,7 +62,7 @@ public class FileStateBackendTest {
private static MiniDFSCluster HDFS_CLUSTER;
private static FileSystem FS;
-
+
// ------------------------------------------------------------------------
// startup / shutdown
// ------------------------------------------------------------------------
@@ -127,7 +126,7 @@ public class FileStateBackendTest {
// supreme!
}
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
assertNotNull(backend.getCheckpointDirectory());
Path checkpointDir = backend.getCheckpointDirectory();
@@ -150,7 +149,7 @@ public class FileStateBackendTest {
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
Path checkpointDir = backend.getCheckpointDirectory();
@@ -186,7 +185,7 @@ public class FileStateBackendTest {
public void testStateOutputStream() {
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
- backend.initializeForJob(new JobID());
+ backend.initializeForJob(new DummyEnvironment("test", 0, 0));
Path checkpointDir = backend.getCheckpointDirectory();
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ce4d174..6c1f1ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -322,7 +322,7 @@ public abstract class AbstractStreamOperator<OUT>
if (kvstate == null) {
// create a new blank key/value state
- kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
+ kvstate = stateBackend.createKvState(getOperatorConfig().getVertexID() ,name , keySerializer, valueSerializer, defaultValue);
}
if (keyValueStatesByName == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 80c63da..ed7182d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -169,7 +169,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
accumulatorMap = accumulatorRegistry.getUserMap();
stateBackend = createStateBackend();
- stateBackend.initializeForJob(getEnvironment().getJobID());
+ stateBackend.initializeForJob(getEnvironment());
headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
http://git-wip-us.apache.org/repos/asf/flink/blob/ad6f8265/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 67c0189..42b6230 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -50,10 +50,11 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
final long NUM_STRINGS = 10_000_000L;
+ final static int NUM_KEYS = 40;
@Override
public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", (NUM_STRINGS/2) % 40 == 0);
+ assertTrue("Broken test setup", (NUM_STRINGS/2) % NUM_KEYS == 0);
DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
@@ -69,14 +70,14 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
public void postSubmit() {
// verify that we counted exactly right
for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
- assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
+ assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue());
}
for (Long count : CounterSink.allCounts.values()) {
- assertEquals(new Long(NUM_STRINGS / 40), count);
+ assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count);
}
- assertEquals(40, CounterSink.allCounts.size());
- assertEquals(40, OnceFailingPartitionedSum.allSums.size());
+ assertEquals(NUM_KEYS, CounterSink.allCounts.size());
+ assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size());
}
// --------------------------------------------------------------------------------------------
@@ -120,7 +121,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
synchronized (lockingObject) {
index += step;
- ctx.collect(index % 40);
+ ctx.collect(index % NUM_KEYS);
}
}
}
@@ -160,9 +161,9 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
@Override
public void open(Configuration parameters) throws IOException {
- long failurePosMin = (long) (0.4 * numElements / getRuntimeContext()
+ long failurePosMin = (long) (0.6 * numElements / getRuntimeContext()
.getNumberOfParallelSubtasks());
- long failurePosMax = (long) (0.7 * numElements / getRuntimeContext()
+ long failurePosMax = (long) (0.8 * numElements / getRuntimeContext()
.getNumberOfParallelSubtasks());
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
@@ -213,7 +214,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
}
}
- private static class NonSerializableLong {
+ public static class NonSerializableLong {
public Long value;
private NonSerializableLong(long value) {
@@ -225,7 +226,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
}
}
- private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ public static class IdentityKeySelector<T> implements KeySelector<T, T> {
@Override
public T getKey(T value) throws Exception {