You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/05/19 22:23:48 UTC
[1/6] flink git commit: [streaming] ByteStream and File state handle
added
Repository: flink
Updated Branches:
refs/heads/master f59d6a118 -> 147997388
[streaming] ByteStream and File state handle added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48a27200
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48a27200
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48a27200
Branch: refs/heads/master
Commit: 48a2720055987fa67ca3cd16cdd57d34648cf59a
Parents: f59d6a1
Author: Gyula Fora <gy...@apache.org>
Authored: Thu May 7 14:29:23 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:21:20 2015 +0200
----------------------------------------------------------------------
.../runtime/state/ByteStreamStateHandle.java | 56 +++++++++++++++++++
.../flink/runtime/state/FileStateHandle.java | 58 ++++++++++++++++++++
2 files changed, 114 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48a27200/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
new file mode 100644
index 0000000..36598d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
+
+ private static final long serialVersionUID = -962025800339325828L;
+
+ transient Serializable state;
+
+ public ByteStreamStateHandle(Serializable state) throws IOException {
+ this.state = state;
+ }
+
+ protected abstract OutputStream getOutputStream() throws Exception;
+
+ protected abstract InputStream getInputStream() throws Exception;
+
+ @Override
+ public Serializable getState() throws Exception {
+ ObjectInputStream stream = new ObjectInputStream(getInputStream());
+ state = (Serializable) stream.readObject();
+ stream.close();
+ return state;
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws Exception {
+ ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
+ stream.writeObject(state);
+ stream.close();
+ oos.defaultWriteObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48a27200/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
new file mode 100644
index 0000000..9d614d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import scala.util.Random;
+
+public class FileStateHandle extends ByteStreamStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ String pathString;
+
+ public FileStateHandle(Serializable state, String folder) throws IOException {
+ super(state);
+ this.pathString = folder + "/" + randomString();
+ }
+
+ protected OutputStream getOutputStream() throws IOException, URISyntaxException {
+ return FileSystem.get(new URI(pathString)).create(new Path(pathString), true);
+ }
+
+ protected InputStream getInputStream() throws IOException, URISyntaxException {
+ return FileSystem.get(new URI(pathString)).open(new Path(pathString));
+ }
+
+ private String randomString() {
+ final byte[] bytes = new byte[20];
+ new Random().nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
+}
[2/6] flink git commit: [streaming] Discard method added to state
handle
Posted by gy...@apache.org.
[streaming] Discard method added to state handle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59bee4ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59bee4ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59bee4ad
Branch: refs/heads/master
Commit: 59bee4ad825d7252c791e0376ea2504602134fe7
Parents: 48a2720
Author: Gyula Fora <gy...@apache.org>
Authored: Thu May 14 11:49:30 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:25:53 2015 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 25 ++++++++++++++------
.../runtime/checkpoint/PendingCheckpoint.java | 23 +++++++++++-------
.../flink/runtime/checkpoint/StateForTask.java | 18 ++++++++++++--
.../checkpoint/SuccessfulCheckpoint.java | 9 ++++++-
.../runtime/executiongraph/ExecutionGraph.java | 2 +-
.../runtime/state/ByteStreamStateHandle.java | 12 ++++++++++
.../flink/runtime/state/FileStateHandle.java | 15 +++++++++++-
.../flink/runtime/state/LocalStateHandle.java | 6 ++++-
.../apache/flink/runtime/state/StateHandle.java | 7 ++++++
.../apache/flink/runtime/taskmanager/Task.java | 4 +---
.../checkpoint/CheckpointCoordinatorTest.java | 16 +++++++------
.../checkpoint/CheckpointStateRestoreTest.java | 10 ++++----
.../messages/CheckpointMessagesTest.java | 6 ++++-
.../streaming/runtime/tasks/StreamTask.java | 7 +++---
14 files changed, 120 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index be0fcf2..e4abbf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -97,6 +97,8 @@ public class CheckpointCoordinator {
private ActorRef jobStatusListener;
+ private ClassLoader userClassLoader;
+
private boolean shutdown;
// --------------------------------------------------------------------------------------------
@@ -104,7 +106,8 @@ public class CheckpointCoordinator {
public CheckpointCoordinator(JobID job, int numSuccessfulCheckpointsToRetain, long checkpointTimeout,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
- ExecutionVertex[] tasksToCommitTo) {
+ ExecutionVertex[] tasksToCommitTo,
+ ClassLoader userClassLoader) {
// some sanity checks
if (job == null || tasksToTrigger == null ||
@@ -127,6 +130,7 @@ public class CheckpointCoordinator {
this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+ this.userClassLoader = userClassLoader;
timer = new Timer("Checkpoint Timer", true);
}
@@ -166,13 +170,13 @@ public class CheckpointCoordinator {
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
- pending.discard();
+ pending.discard(userClassLoader, true);
}
pendingCheckpoints.clear();
// clean and discard all successful checkpoints
for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
- checkpoint.dispose();
+ checkpoint.dispose(userClassLoader);
}
completedCheckpoints.clear();
}
@@ -253,7 +257,9 @@ public class CheckpointCoordinator {
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint " + checkpointID + " expired before completing.");
- checkpoint.discard();
+
+ checkpoint.discard(userClassLoader, true);
+
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID);
}
@@ -288,7 +294,10 @@ public class CheckpointCoordinator {
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
synchronized (lock) {
- pendingCheckpoints.remove(checkpointID);
+ PendingCheckpoint checkpoint = pendingCheckpoints.remove(checkpointID);
+ if (checkpoint != null && !checkpoint.isDiscarded()) {
+ checkpoint.discard(userClassLoader, true);
+ }
}
return false;
@@ -325,7 +334,7 @@ public class CheckpointCoordinator {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpoints.addLast(completed);
if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
- completedCheckpoints.removeFirst();
+ completedCheckpoints.removeFirst().dispose(userClassLoader);;
}
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
@@ -383,7 +392,9 @@ public class CheckpointCoordinator {
PendingCheckpoint p = entries.next().getValue();
if (p.getCheckpointTimestamp() < timestamp) {
rememberRecentCheckpointId(p.getCheckpointId());
- p.discard();
+
+ p.discard(userClassLoader, true);
+
entries.remove();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index f25bff9..9ea3b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,16 +18,16 @@
package org.apache.flink.runtime.checkpoint;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
/**
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -37,7 +37,7 @@ import java.util.Map;
* state handles always as serialized values, never as actual values.</p>
*/
public class PendingCheckpoint {
-
+
private final Object lock = new Object();
private final JobID jobId;
@@ -53,7 +53,7 @@ public class PendingCheckpoint {
private int numAcknowledgedTasks;
private boolean discarded;
-
+
// --------------------------------------------------------------------------------------------
public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
@@ -117,7 +117,8 @@ public class PendingCheckpoint {
if (notYetAcknowledgedTasks.isEmpty()) {
SuccessfulCheckpoint completed = new SuccessfulCheckpoint(jobId, checkpointId,
checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
- discard();
+ discard(null, false);
+
return completed;
}
else {
@@ -148,11 +149,17 @@ public class PendingCheckpoint {
/**
* Discards the pending checkpoint, releasing all held resources.
+ * @throws Exception
*/
- public void discard() {
+ public void discard(ClassLoader userClassLoader, boolean discardStateHandle) {
synchronized (lock) {
discarded = true;
numAcknowledgedTasks = -1;
+ if (discardStateHandle) {
+ for (StateForTask state : collectedStates) {
+ state.discard(userClassLoader);
+ }
+ }
collectedStates.clear();
notYetAcknowledgedTasks.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 26b3eb7..073b22f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -18,9 +18,13 @@
package org.apache.flink.runtime.checkpoint;
+import java.io.IOException;
+
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Simple bean to describe the state belonging to a parallel operator.
@@ -33,6 +37,8 @@ import org.apache.flink.runtime.util.SerializedValue;
* the respective classloader.
*/
public class StateForTask {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class);
/** The state of the parallel operator */
private final SerializedValue<StateHandle<?>> state;
@@ -42,9 +48,9 @@ public class StateForTask {
/** The index of the parallel subtask */
private final int subtask;
-
+
public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
- if (state == null || operatorId == null || subtask < 0) {
+ if (state == null || operatorId == null || subtask < 0) {
throw new IllegalArgumentException();
}
@@ -66,6 +72,14 @@ public class StateForTask {
public int getSubtask() {
return subtask;
}
+
+ public void discard(ClassLoader userClassLoader) {
+ try {
+ state.deserializeValue(userClassLoader).discardState();
+ } catch (Exception e) {
+ LOG.warn("Failed to discard checkpoint state: " + this, e);
+ }
+ }
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index cd7efba..3f77138 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
@@ -28,6 +30,8 @@ import java.util.List;
*/
public class SuccessfulCheckpoint {
+ private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class);
+
private final JobID job;
private final long checkpointID;
@@ -62,7 +66,10 @@ public class SuccessfulCheckpoint {
// --------------------------------------------------------------------------------------------
- public void dispose() {
+ public void dispose(ClassLoader userClassLoader) {
+ for(StateForTask state: states){
+ state.discard(userClassLoader);
+ }
states.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a041f86..73e827f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -307,7 +307,7 @@ public class ExecutionGraph implements Serializable {
// create the coordinator that triggers and commits checkpoints and holds the state
snapshotCheckpointsEnabled = true;
checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
- checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo);
+ checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader);
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index 36598d9..a202a83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -25,6 +25,12 @@ import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+/**
+ * Statehandle that writes/reads the contents of the serializable checkpointed
+ * state to the provided input and outputstreams using default java
+ * serialization.
+ *
+ */
public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
private static final long serialVersionUID = -962025800339325828L;
@@ -35,8 +41,14 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
this.state = state;
}
+ /**
+ * The state will be written to the stream returned by this method.
+ */
protected abstract OutputStream getOutputStream() throws Exception;
+ /**
+ * The state will be read from the stream returned by this method.
+ */
protected abstract InputStream getInputStream() throws Exception;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 9d614d8..956bd9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -31,11 +31,18 @@ import org.apache.flink.util.StringUtils;
import scala.util.Random;
+/**
+ * Statehandle that writes the checkpointed state to a random file in the
+ * provided checkpoint directory. Any Flink supported File system can be used
+ * but it is advised to use a filesystem that is persistent in case of node
+ * failures, such as HDFS or Tachyon.
+ *
+ */
public class FileStateHandle extends ByteStreamStateHandle {
private static final long serialVersionUID = 1L;
- String pathString;
+ private String pathString;
public FileStateHandle(Serializable state, String folder) throws IOException {
super(state);
@@ -55,4 +62,10 @@ public class FileStateHandle extends ByteStreamStateHandle {
new Random().nextBytes(bytes);
return StringUtils.byteToHexString(bytes);
}
+
+ @Override
+ public void discardState() throws Exception {
+ FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index fa0c515..e8fe768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.state;
import java.io.Serializable;
/**
- * A StateHandle that includes a map of operator states directly.
+ * A StateHandle that includes the operator states directly.
*/
public class LocalStateHandle implements StateHandle<Serializable> {
@@ -37,4 +37,8 @@ public class LocalStateHandle implements StateHandle<Serializable> {
public Serializable getState() {
return state;
}
+
+ @Override
+ public void discardState() throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index 409383c..c1342b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -34,4 +34,11 @@ public interface StateHandle<T> extends Serializable {
* @throws java.lang.Exception Thrown, if the state cannot be fetched.
*/
T getState() throws Exception;
+
+ /**
+ * Discards the state referred to by this handle, to free up resources in
+ * the persistent storage. This method is called when the handle will not be
+ * used any more.
+ */
+ void discardState() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f3c40ba..8f613fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -53,10 +53,8 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
import org.apache.flink.runtime.state.StateHandle;
-
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.runtime.util.SerializedValue;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -522,7 +520,7 @@ public class Task implements Runnable {
StateUtils.setOperatorState(op, state);
}
catch (Exception e) {
- throw new Exception("Failed to deserialize state handle and setup initial operator state");
+ throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
}
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 10c8074..514b7c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,8 @@ import java.util.List;
*/
public class CheckpointCoordinatorTest {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
@Test
public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
try {
@@ -59,7 +61,7 @@ public class CheckpointCoordinatorTest {
jid, 1, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] {} );
+ new ExecutionVertex[] {}, cl );
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -101,7 +103,7 @@ public class CheckpointCoordinatorTest {
jid, 1, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] {} );
+ new ExecutionVertex[] {}, cl );
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -139,7 +141,7 @@ public class CheckpointCoordinatorTest {
jid, 1, 600000,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
- new ExecutionVertex[] { vertex1, vertex2 });
+ new ExecutionVertex[] { vertex1, vertex2 }, cl);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -282,7 +284,7 @@ public class CheckpointCoordinatorTest {
jid, 2, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
- new ExecutionVertex[] { commitVertex });
+ new ExecutionVertex[] { commitVertex }, cl);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -409,7 +411,7 @@ public class CheckpointCoordinatorTest {
jid, 10, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
- new ExecutionVertex[] { commitVertex });
+ new ExecutionVertex[] { commitVertex }, cl);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -523,7 +525,7 @@ public class CheckpointCoordinatorTest {
jid, 2, 200,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] { commitVertex });
+ new ExecutionVertex[] { commitVertex }, cl);
// trigger a checkpoint, partially acknowledged
assertTrue(coord.triggerCheckpoint(timestamp));
@@ -581,7 +583,7 @@ public class CheckpointCoordinatorTest {
jid, 2, 200000,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
- new ExecutionVertex[] { commitVertex });
+ new ExecutionVertex[] { commitVertex }, cl);
assertTrue(coord.triggerCheckpoint(timestamp));
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index e1cf061..c23aaca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedValue;
-
import org.junit.Test;
-
import org.mockito.Mockito;
import java.util.HashMap;
@@ -47,6 +45,8 @@ import static org.mockito.Mockito.*;
*/
public class CheckpointStateRestoreTest {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
@Test
public void testSetState() {
try {
@@ -82,7 +82,7 @@ public class CheckpointStateRestoreTest {
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[0]);
+ new ExecutionVertex[0], cl);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest {
CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
- new ExecutionVertex[0]);
+ new ExecutionVertex[0], cl);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -191,7 +191,7 @@ public class CheckpointStateRestoreTest {
CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
- new ExecutionVertex[0]);
+ new ExecutionVertex[0], cl);
try {
coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 9a9f486..a204c3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -96,5 +96,9 @@ public class CheckpointMessagesTest {
public int hashCode() {
return getClass().hashCode();
}
- };
+
+ @Override
+ public void discardState() throws Exception {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a412e05..4d4893b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -41,7 +42,7 @@ import org.slf4j.LoggerFactory;
public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
- OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator {
+ OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointCommittingOperator {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
@@ -136,7 +137,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
* Re-injects the user states into the map. Also set the state on the functions.
*/
@Override
- public void setInitialState(LocalStateHandle stateHandle) throws Exception {
+ public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
// here, we later resolve the state handle into the actual state by
// loading the state described by the handle from the backup store
Serializable state = stateHandle.getState();
@@ -184,7 +185,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
LOG.info("Starting checkpoint {} on task {}", checkpointId, getName());
// first draw the state that should go into checkpoint
- LocalStateHandle state;
+ StateHandle<Serializable> state;
try {
Serializable userState = null;
[4/6] flink git commit: [streaming] StateHandleProvider added for
configurable state backend
Posted by gy...@apache.org.
[streaming] StateHandleProvider added for configurable state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/197cd6cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/197cd6cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/197cd6cf
Branch: refs/heads/master
Commit: 197cd6cf0fb10efc6badd5fc8584f4b36b09e705
Parents: 59bee4a
Author: Gyula Fora <gy...@apache.org>
Authored: Sat May 16 22:38:16 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:32:02 2015 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 6 +--
.../flink/runtime/checkpoint/StateForTask.java | 2 -
.../checkpoint/SuccessfulCheckpoint.java | 2 +-
.../runtime/state/ByteStreamStateHandle.java | 3 +-
.../flink/runtime/state/FileStateHandle.java | 34 +++++++++++++-
.../flink/runtime/state/LocalStateHandle.java | 17 ++++++-
.../runtime/state/StateHandleProvider.java | 39 ++++++++++++++++
.../environment/StreamExecutionEnvironment.java | 15 ++++++
.../flink/streaming/api/graph/StreamConfig.java | 21 +++++++++
.../flink/streaming/api/graph/StreamGraph.java | 11 +++++
.../api/graph/StreamingJobGraphGenerator.java | 1 +
.../streaming/runtime/tasks/StreamTask.java | 7 ++-
.../api/scala/StreamExecutionEnvironment.scala | 15 ++++--
.../ProcessFailureStreamingRecoveryITCase.java | 49 ++++++++++++++------
14 files changed, 192 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4abbf8..b52e732 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -176,7 +176,7 @@ public class CheckpointCoordinator {
// clean and discard all successful checkpoints
for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
- checkpoint.dispose(userClassLoader);
+ checkpoint.discard(userClassLoader);
}
completedCheckpoints.clear();
}
@@ -334,7 +334,7 @@ public class CheckpointCoordinator {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpoints.addLast(completed);
if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
- completedCheckpoints.removeFirst().dispose(userClassLoader);;
+ completedCheckpoints.removeFirst().discard(userClassLoader);
}
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
@@ -409,7 +409,7 @@ public class CheckpointCoordinator {
boolean allOrNothingState) throws Exception {
synchronized (lock) {
if (shutdown) {
- throw new IllegalStateException("CheckpointCoordinator is hut down");
+ throw new IllegalStateException("CheckpointCoordinator is shut down");
}
if (completedCheckpoints.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 073b22f..73deeed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import java.io.IOException;
-
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index 3f77138..be0b301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -66,7 +66,7 @@ public class SuccessfulCheckpoint {
// --------------------------------------------------------------------------------------------
- public void dispose(ClassLoader userClassLoader) {
+ public void discard(ClassLoader userClassLoader) {
for(StateForTask state: states){
state.discard(userClassLoader);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index a202a83..d7dbb84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.state;
-import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -37,7 +36,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
transient Serializable state;
- public ByteStreamStateHandle(Serializable state) throws IOException {
+ public ByteStreamStateHandle(Serializable state) {
this.state = state;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 956bd9d..091c739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils;
-import scala.util.Random;
+import java.util.Random;
/**
* Statehandle that writes the checkpointed state to a random file in the
@@ -44,7 +44,7 @@ public class FileStateHandle extends ByteStreamStateHandle {
private String pathString;
- public FileStateHandle(Serializable state, String folder) throws IOException {
+ public FileStateHandle(Serializable state, String folder) {
super(state);
this.pathString = folder + "/" + randomString();
}
@@ -68,4 +68,34 @@ public class FileStateHandle extends ByteStreamStateHandle {
FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
}
+ /**
+ * Creates a {@link StateHandleProvider} for creating
+ * {@link FileStateHandle}s for a given checkpoint directory.
+ *
+ */
+ public static StateHandleProvider<Serializable> createProvider(String checkpointDir) {
+ return new FileStateHandleProvider(checkpointDir);
+ }
+
+ /**
+ * {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
+ * given checkpoint directory.
+ *
+ */
+ private static class FileStateHandleProvider implements StateHandleProvider<Serializable> {
+
+ private static final long serialVersionUID = 3496670017955260518L;
+ private String path;
+
+ public FileStateHandleProvider(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public FileStateHandle createStateHandle(Serializable state) {
+ return new FileStateHandle(state, path);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index e8fe768..a53d8da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
public class LocalStateHandle implements StateHandle<Serializable> {
private static final long serialVersionUID = 2093619217898039610L;
-
+
private final Serializable state;
public LocalStateHandle(Serializable state) {
@@ -41,4 +41,19 @@ public class LocalStateHandle implements StateHandle<Serializable> {
@Override
public void discardState() throws Exception {
}
+
+ public static LocalStateHandleProvider createProvider(){
+ return new LocalStateHandleProvider();
+ }
+
+ private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> {
+
+ private static final long serialVersionUID = 4665419208932921425L;
+
+ @Override
+ public LocalStateHandle createStateHandle(Serializable state) {
+ return new LocalStateHandle(state);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
new file mode 100644
index 0000000..bac490b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Stateful streaming operators use a StateHandleProvider to create new
+ * {@link StateHandle}s to store each checkpoint in a persistent storage layer.
+ */
+public interface StateHandleProvider<T> extends Serializable {
+
+ /**
+ * Creates a new {@link StateHandle} instance that will be used to store the
+ * state checkpoint. This method is called for each state checkpoint saved.
+ *
+ * @param state
+ * State to be stored in the handle.
+ *
+ */
+ public StateHandle<T> createStateHandle(T state);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 820cfed..02c8dad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,8 @@ import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
@@ -237,6 +239,19 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Sets the {@link StateHandleProvider} used for storing operator state
+ * checkpoints when checkpointing is enabled.
+ * <p>
+ * An example would be using a {@link FileStateHandle#createProvider(Path)}
+ * to use any Flink supported file system as a state backend
+ *
+ */
+ public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
+ streamGraph.setStateHandleProvider(provider);
+ return this;
+ }
+
+ /**
* Sets the number of times that failed tasks are re-executed. A value of
* zero effectively disables fault tolerance. A value of {@code -1}
* indicates that the system default value (as defined in the configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index a1047df..3b00000 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -57,6 +58,7 @@ public class StreamConfig implements Serializable {
private static final String EDGES_IN_ORDER = "edgesInOrder";
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";
+ private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
// DEFAULT VALUES
private static final long DEFAULT_TIMEOUT = 100;
@@ -377,6 +379,25 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate configuration.", e);
}
}
+
+ public void setStateHandleProvider(StateHandleProvider<?> provider) {
+
+ try {
+ InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
+ } catch (IOException e) {
+ throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
+ try {
+ return (StateHandleProvider<R>) InstantiationUtil
+ .readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not instantiate statehandle provider.", e);
+ }
+ }
public void setChainStart() {
config.setBoolean(IS_CHAINED_VERTEX, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 1ad1be0..aeba566 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -76,6 +78,7 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop;
+ private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
public StreamGraph(StreamExecutionEnvironment environment) {
@@ -116,6 +119,14 @@ public class StreamGraph extends StreamingPlan {
this.checkpointingInterval = checkpointingInterval;
}
+ public void setStateHandleProvider(StateHandleProvider<?> provider) {
+ this.stateHandleProvider = provider;
+ }
+
+ public StateHandleProvider<?> getStateHandleProvider() {
+ return this.stateHandleProvider;
+ }
+
public long getCheckpointingInterval() {
return checkpointingInterval;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 799862a..ef5ffca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -258,6 +258,7 @@ public class StreamingJobGraphGenerator {
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+ config.setStateHandleProvider(streamGraph.getStateHandleProvider());
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4d4893b..828a9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,6 +61,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected StreamingRuntimeContext context;
protected ClassLoader userClassLoader;
+
+ private StateHandleProvider<Serializable> stateHandleProvider;
private EventListener<TaskEvent> superstepListener;
@@ -74,6 +76,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.context = createRuntimeContext(getEnvironment().getTaskName());
+ this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
outputHandler = new OutputHandler<OUT>(this);
@@ -212,7 +215,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
: null;
}
- state = userState == null ? null : new LocalStateHandle(userState);
+ state = userState == null ? null : stateHandleProvider.createStateHandle(userState);
}
catch (Exception e) {
throw new Exception("Error while drawing snapshot of the user state.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 686fc23..5999625 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,19 +19,17 @@
package org.apache.flink.streaming.api.scala
import scala.reflect.ClassTag
-
import com.esotericsoftware.kryo.Serializer
import org.apache.commons.lang.Validate
import org.joda.time.Instant
-
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
-
import scala.reflect.ClassTag
+import org.apache.flink.runtime.state.StateHandleProvider
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
@@ -125,7 +123,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.enableCheckpointing()
this
}
-
+
+ /**
+ * Sets the given StateHandleProvider to be used for storing operator state
+ * checkpoints when checkpointing is enabled.
+ */
+ def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment = {
+ javaEnv.setStateHandleProvider(provider)
+ this
+ }
+
/**
* Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully
http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 627016c..fb4b2b7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,16 +18,11 @@
package org.apache.flink.test.recovery;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.File;
@@ -40,7 +35,17 @@ import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.UUID;
-import static org.junit.Assert.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
* Test for streaming program behaviour in case of TaskManager failure
@@ -62,11 +67,16 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
@Override
public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
-
+
final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
UUID.randomUUID().toString());
assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs());
+
+ final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+ UUID.randomUUID().toString());
+
+ assertTrue("Cannot create directory for checkpoints", tempCheckpointDir.mkdirs());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", jobManagerPort);
@@ -74,6 +84,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
env.getConfig().disableSysoutLogging();
env.setNumberOfExecutionRetries(1);
env.enableCheckpointing(200);
+ env.setStateHandleProvider(FileStateHandle.createProvider(tempCheckpointDir.getAbsolutePath()));
DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// add a non-chained no-op map to test the chain state restore logic
@@ -125,12 +136,19 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
// validate
fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput);
+
+ // TODO: Figure out why this fails when ran with other tests
+ // Check whether checkpoints have been cleaned up properly
+ // assertDirectoryEmpty(tempCheckpointDir);
}
finally {
// clean up
if (tempTestOutput.exists()) {
FileUtils.deleteDirectory(tempTestOutput);
}
+ if (tempCheckpointDir.exists()) {
+ FileUtils.deleteDirectory(tempCheckpointDir);
+ }
}
}
@@ -155,7 +173,6 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
@Override
- @SuppressWarnings("unchecked")
public void open(Configuration config) {
stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
congruence = getRuntimeContext().getIndexOfThisSubtask();
@@ -267,4 +284,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
}
+
+ private static void assertDirectoryEmpty(File path){
+ File[] files = path.listFiles();
+ assertNotNull(files);
+ assertEquals("Checkpoint dir is not empty", 0, files.length);
+ }
}
[3/6] flink git commit: [streaming] Added HDFS test for
FileStateHandle
Posted by gy...@apache.org.
[streaming] Added HDFS test for FileStateHandle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6edf31a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6edf31a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6edf31a0
Branch: refs/heads/master
Commit: 6edf31a06f4e3462fa5d377d8ac844153d4171fd
Parents: 197cd6c
Author: Gyula Fora <gy...@apache.org>
Authored: Mon May 18 13:47:14 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:32:02 2015 +0200
----------------------------------------------------------------------
.../runtime/state/ByteStreamStateHandle.java | 18 ++-
.../flink/tachyon/FileStateHandleTest.java | 115 +++++++++++++++++++
.../java/org/apache/flink/tachyon/HDFSTest.java | 4 +-
3 files changed, 131 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index d7dbb84..257784a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -34,7 +34,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
private static final long serialVersionUID = -962025800339325828L;
- transient Serializable state;
+ private transient Serializable state;
public ByteStreamStateHandle(Serializable state) {
this.state = state;
@@ -52,9 +52,11 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
@Override
public Serializable getState() throws Exception {
- ObjectInputStream stream = new ObjectInputStream(getInputStream());
- state = (Serializable) stream.readObject();
- stream.close();
+ if (!stateFetched()) {
+ ObjectInputStream stream = new ObjectInputStream(getInputStream());
+ state = (Serializable) stream.readObject();
+ stream.close();
+ }
return state;
}
@@ -64,4 +66,12 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
stream.close();
oos.defaultWriteObject();
}
+
+ /**
+ * Checks whether the state has already been fetched from the remote
+ * storage.
+ */
+ public boolean stateFetched() {
+ return state != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
new file mode 100644
index 0000000..82b5d35
--- /dev/null
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStateHandleTest {
+
+ private String hdfsURI;
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ private org.apache.hadoop.fs.FileSystem hdfs;
+
+ @Before
+ public void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
+ + hdfsCluster.getNameNodePort() + "/";
+
+ hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
+ hdfs = hdPath.getFileSystem(hdConf);
+ hdfs.mkdirs(hdPath);
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Test failed " + e.getMessage());
+ }
+ }
+
+ @After
+ public void destroyHDFS() {
+ try {
+ hdfs.delete(hdPath, true);
+ hdfsCluster.shutdown();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Test
+ public void testFileStateHandle() throws Exception {
+
+ Serializable state = "state";
+
+ // Create a state handle provider for the hdfs directory
+ StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
+ + hdPath);
+
+ FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+
+ assertTrue(handle.stateFetched());
+
+ // Serialize the handle so it writes the value to hdfs
+ SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
+ handle);
+
+ // Deserialize the handle and verify that the state is not fetched yet
+ FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
+ .deserializeValue(Thread.currentThread().getContextClassLoader());
+ assertFalse(deserializedHandle.stateFetched());
+
+ // Fetch the and compare with original
+ assertEquals(state, deserializedHandle.getState());
+
+ // Test whether discard removes the checkpoint file properly
+ assertTrue(hdfs.listFiles(hdPath, true).hasNext());
+ handle.discardState();
+ assertFalse(hdfs.listFiles(hdPath, true).hasNext());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index fd328ae..a761712 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -45,10 +45,10 @@ import java.io.StringWriter;
*/
public class HDFSTest {
- private String hdfsURI;
+ protected String hdfsURI;
private MiniDFSCluster hdfsCluster;
private org.apache.hadoop.fs.Path hdPath;
- private org.apache.hadoop.fs.FileSystem hdfs;
+ protected org.apache.hadoop.fs.FileSystem hdfs;
@Before
public void createHDFS() {
[6/6] flink git commit: [streaming] State backend configurable from
flink-conf.yaml
Posted by gy...@apache.org.
[streaming] State backend configurable from flink-conf.yaml
Closes #676
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14799738
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14799738
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14799738
Branch: refs/heads/master
Commit: 147997388580e25ebbeabe86af706a4d6fa1de39
Parents: 2d3e69a
Author: Gyula Fora <gy...@apache.org>
Authored: Tue May 19 12:17:28 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 22:21:14 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 15 ++++++
flink-dist/src/main/resources/flink-conf.yaml | 15 ++++++
.../flink/streaming/api/graph/StreamGraph.java | 3 +-
.../streaming/runtime/tasks/StreamTask.java | 52 +++++++++++++++++++-
4 files changed, 82 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 7908002..6a94c3d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -402,6 +402,18 @@ public final class ConfigConstants {
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
+ // ----------------------------- Streaming --------------------------------
+
+ /**
+ * State backend for checkpoints;
+ */
+ public static final String STATE_BACKEND = "state.backend";
+
+ /**
+ * Directory for saving streaming checkpoints
+ */
+ public static final String STATE_BACKEND_FS_DIR = "state.backend.fs.checkpointdir";
+
// ----------------------------- Miscellaneous ----------------------------
/**
@@ -624,6 +636,9 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
+ // ----------------------------- Streaming Values --------------------------
+
+ public static String DEFAULT_STATE_BACKEND = "jobmanager";
// ----------------------------- LocalExecution ----------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 2f2eb22..f0c0d56 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -48,6 +48,21 @@ jobmanager.web.port: 8081
webclient.port: 8080
#==============================================================================
+# Streaming state checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends: jobmanager, filesystem
+
+state.backend: jobmanager
+
+# Directory for storing checkpoints in a flink supported filesystem
+#
+# state.backend.fs.checkpointdir: hdfs://checkpoints
+
+#==============================================================================
# Advanced
#==============================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index aeba566..593b476 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,7 +38,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -78,7 +77,7 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop;
- private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
+ private StateHandleProvider<?> stateHandleProvider;
public StreamGraph(StreamExecutionEnvironment environment) {
http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 828a9a7..d678922 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,12 +24,16 @@ import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.event.EventListener;
@@ -76,7 +80,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.context = createRuntimeContext(getEnvironment().getTaskName());
- this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
+ this.stateHandleProvider = getStateHandleProvider();
outputHandler = new OutputHandler<OUT>(this);
@@ -98,6 +102,52 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
getExecutionConfig());
}
+
+ private StateHandleProvider<Serializable> getStateHandleProvider() {
+
+ StateHandleProvider<Serializable> provider = configuration
+ .getStateHandleProvider(userClassLoader);
+
+ // If the user did not specify a provider in the program we try to get it from the config
+ if (provider == null) {
+ String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
+ ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
+
+ StateBackend backend;
+
+ try {
+ backend = StateBackend.valueOf(backendName);
+ } catch (Exception e) {
+ throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+ }
+
+ switch (backend) {
+ case JOBMANAGER:
+ LOG.info("State backend for state checkpoints is set to jobmanager.");
+ return LocalStateHandle.createProvider();
+ case FILESYSTEM:
+ String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
+ if (checkpointDir != null) {
+ LOG.info("State backend for state checkpoints is set to filesystem with directory: "
+ + checkpointDir);
+ return FileStateHandle.createProvider(checkpointDir);
+ } else {
+ throw new RuntimeException(
+ "For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
+ }
+ default:
+ throw new RuntimeException("Backend " + backend + " is not supported yet.");
+ }
+
+ } else {
+ LOG.info("Using user defined state backend for streaming checkpoitns.");
+ return provider;
+ }
+ }
+
+ private enum StateBackend {
+ JOBMANAGER, FILESYSTEM
+ }
protected void openOperator() throws Exception {
streamOperator.open(getTaskConfiguration());
[5/6] flink git commit: [FLINK-1986] [streaming] Iterative stream
creation bugfix
Posted by gy...@apache.org.
[FLINK-1986] [streaming] Iterative stream creation bugfix
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d3e69a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d3e69a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d3e69a2
Branch: refs/heads/master
Commit: 2d3e69a25e753b2eb825b589fe2550d2b968e161
Parents: 6edf31a
Author: Gyula Fora <gy...@apache.org>
Authored: Mon May 18 16:36:14 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:33:14 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 16 ++++++++
.../api/datastream/IterativeDataStream.java | 43 ++------------------
2 files changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d3e69a2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 03cdaa5..dbb9b05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -113,6 +113,9 @@ public class DataStream<OUT> {
@SuppressWarnings("rawtypes")
protected TypeInformation typeInfo;
protected List<DataStream<OUT>> mergedStreams;
+
+ protected Integer iterationID = null;
+ protected Long iterationWaitTime = null;
protected final StreamGraph streamGraph;
private boolean typeUsed;
@@ -160,6 +163,8 @@ public class DataStream<OUT> {
this.partitioner = dataStream.partitioner.copy();
this.streamGraph = dataStream.streamGraph;
this.typeInfo = dataStream.typeInfo;
+ this.iterationID = dataStream.iterationID;
+ this.iterationWaitTime = dataStream.iterationWaitTime;
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
if (dataStream.mergedStreams.size() > 1) {
@@ -1224,9 +1229,20 @@ public class DataStream<OUT> {
operatorName);
connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (iterationID != null) {
+ //This data stream is an input to some iteration
+ addIterationSource(returnStream);
+ }
return returnStream;
}
+
+ private <X> void addIterationSource(DataStream<X> dataStream) {
+ Integer id = ++counter;
+ streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime);
+ streamGraph.setParallelism(id, dataStream.getParallelism());
+ }
/**
* Internal function for setting the partitioner for the DataStream
http://git-wip-us.apache.org/repos/asf/flink/blob/2d3e69a2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index b1bdb85..178f2eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
/**
* The iterative data stream represents the start of an iteration in a
* {@link DataStream}.
@@ -31,21 +28,13 @@ public class IterativeDataStream<IN> extends
SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
static Integer iterationCount = 0;
- protected Integer iterationID;
- protected long waitTime;
-
+
protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
super(dataStream);
setBufferTimeout(dataStream.environment.getBufferTimeout());
iterationID = iterationCount;
iterationCount++;
- waitTime = maxWaitTime;
- }
-
- protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, long waitTime) {
- super(dataStream);
- this.iterationID = iterationID;
- this.waitTime = waitTime;
+ iterationWaitTime = maxWaitTime;
}
/**
@@ -70,35 +59,9 @@ public class IterativeDataStream<IN> extends
// We add an iteration sink to the tail which will send tuples to the
// iteration head
streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
- waitTime);
+ iterationWaitTime);
connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
return iterationTail;
}
-
- @Override
- public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
- TypeInformation<R> outTypeInfo, OneInputStreamOperator<IN, R> operator) {
-
- // We call the superclass tranform method
- SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,
- operator);
-
- // Then we add a source that will take care of receiving feedback tuples
- // from the tail
- addIterationSource(returnStream);
-
- return returnStream;
- }
-
- private <X> void addIterationSource(DataStream<X> dataStream) {
- Integer id = ++counter;
- streamGraph.addIterationHead(id, dataStream.getId(), iterationID, waitTime);
- streamGraph.setParallelism(id, dataStream.getParallelism());
- }
-
- @Override
- public IterativeDataStream<IN> copy() {
- return new IterativeDataStream<IN>(this, iterationID, waitTime);
- }
}