You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/05/19 22:23:48 UTC

[1/6] flink git commit: [streaming] ByteStream and File state handle added

Repository: flink
Updated Branches:
  refs/heads/master f59d6a118 -> 147997388


[streaming] ByteStream and File state handle added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48a27200
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48a27200
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48a27200

Branch: refs/heads/master
Commit: 48a2720055987fa67ca3cd16cdd57d34648cf59a
Parents: f59d6a1
Author: Gyula Fora <gy...@apache.org>
Authored: Thu May 7 14:29:23 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:21:20 2015 +0200

----------------------------------------------------------------------
 .../runtime/state/ByteStreamStateHandle.java    | 56 +++++++++++++++++++
 .../flink/runtime/state/FileStateHandle.java    | 58 ++++++++++++++++++++
 2 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48a27200/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
new file mode 100644
index 0000000..36598d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
+
+	private static final long serialVersionUID = -962025800339325828L;
+
+	transient Serializable state;
+
+	public ByteStreamStateHandle(Serializable state) throws IOException {
+		this.state = state;
+	}
+
+	protected abstract OutputStream getOutputStream() throws Exception;
+
+	protected abstract InputStream getInputStream() throws Exception;
+
+	@Override
+	public Serializable getState() throws Exception {
+		ObjectInputStream stream = new ObjectInputStream(getInputStream());
+		state = (Serializable) stream.readObject();
+		stream.close();
+		return state;
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws Exception {
+		ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
+		stream.writeObject(state);
+		stream.close();
+		oos.defaultWriteObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a27200/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
new file mode 100644
index 0000000..9d614d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import scala.util.Random;
+
+public class FileStateHandle extends ByteStreamStateHandle {
+
+	private static final long serialVersionUID = 1L;
+
+	String pathString;
+
+	public FileStateHandle(Serializable state, String folder) throws IOException {
+		super(state);
+		this.pathString = folder + "/" + randomString();
+	}
+
+	protected OutputStream getOutputStream() throws IOException, URISyntaxException {
+		return FileSystem.get(new URI(pathString)).create(new Path(pathString), true);
+	}
+
+	protected InputStream getInputStream() throws IOException, URISyntaxException {
+		return FileSystem.get(new URI(pathString)).open(new Path(pathString));
+	}
+
+	private String randomString() {
+		final byte[] bytes = new byte[20];
+		new Random().nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
+}


[2/6] flink git commit: [streaming] Discard method added to state handle

Posted by gy...@apache.org.
[streaming] Discard method added to state handle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59bee4ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59bee4ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59bee4ad

Branch: refs/heads/master
Commit: 59bee4ad825d7252c791e0376ea2504602134fe7
Parents: 48a2720
Author: Gyula Fora <gy...@apache.org>
Authored: Thu May 14 11:49:30 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:25:53 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 25 ++++++++++++++------
 .../runtime/checkpoint/PendingCheckpoint.java   | 23 +++++++++++-------
 .../flink/runtime/checkpoint/StateForTask.java  | 18 ++++++++++++--
 .../checkpoint/SuccessfulCheckpoint.java        |  9 ++++++-
 .../runtime/executiongraph/ExecutionGraph.java  |  2 +-
 .../runtime/state/ByteStreamStateHandle.java    | 12 ++++++++++
 .../flink/runtime/state/FileStateHandle.java    | 15 +++++++++++-
 .../flink/runtime/state/LocalStateHandle.java   |  6 ++++-
 .../apache/flink/runtime/state/StateHandle.java |  7 ++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  4 +---
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++++++------
 .../checkpoint/CheckpointStateRestoreTest.java  | 10 ++++----
 .../messages/CheckpointMessagesTest.java        |  6 ++++-
 .../streaming/runtime/tasks/StreamTask.java     |  7 +++---
 14 files changed, 120 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index be0fcf2..e4abbf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -97,6 +97,8 @@ public class CheckpointCoordinator {
 	
 	private ActorRef jobStatusListener;
 	
+	private ClassLoader userClassLoader;
+	
 	private boolean shutdown;
 	
 	// --------------------------------------------------------------------------------------------
@@ -104,7 +106,8 @@ public class CheckpointCoordinator {
 	public CheckpointCoordinator(JobID job, int numSuccessfulCheckpointsToRetain, long checkpointTimeout,
 								ExecutionVertex[] tasksToTrigger,
 								ExecutionVertex[] tasksToWaitFor,
-								ExecutionVertex[] tasksToCommitTo) {
+								ExecutionVertex[] tasksToCommitTo,
+								ClassLoader userClassLoader) {
 		
 		// some sanity checks
 		if (job == null || tasksToTrigger == null ||
@@ -127,6 +130,7 @@ public class CheckpointCoordinator {
 		this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
 		this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
 		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+		this.userClassLoader = userClassLoader;
 
 		timer = new Timer("Checkpoint Timer", true);
 	}
@@ -166,13 +170,13 @@ public class CheckpointCoordinator {
 			
 			// clear and discard all pending checkpoints
 			for (PendingCheckpoint pending : pendingCheckpoints.values()) {
-				pending.discard();
+					pending.discard(userClassLoader, true);
 			}
 			pendingCheckpoints.clear();
 			
 			// clean and discard all successful checkpoints
 			for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
-				checkpoint.dispose();
+				checkpoint.dispose(userClassLoader);
 			}
 			completedCheckpoints.clear();
 		}
@@ -253,7 +257,9 @@ public class CheckpointCoordinator {
 							// note that checkpoint completion discards the pending checkpoint object
 							if (!checkpoint.isDiscarded()) {
 								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-								checkpoint.discard();
+								
+								checkpoint.discard(userClassLoader, true);
+								
 								pendingCheckpoints.remove(checkpointID);
 								rememberRecentCheckpointId(checkpointID);
 							}
@@ -288,7 +294,10 @@ public class CheckpointCoordinator {
 			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
 			
 			synchronized (lock) {
-				pendingCheckpoints.remove(checkpointID);
+				PendingCheckpoint checkpoint = pendingCheckpoints.remove(checkpointID);
+				if (checkpoint != null && !checkpoint.isDiscarded()) {
+					checkpoint.discard(userClassLoader, true);
+				}
 			}
 			
 			return false;
@@ -325,7 +334,7 @@ public class CheckpointCoordinator {
 						completed = checkpoint.toCompletedCheckpoint();
 						completedCheckpoints.addLast(completed);
 						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
-							completedCheckpoints.removeFirst();
+							completedCheckpoints.removeFirst().dispose(userClassLoader);;
 						}
 						pendingCheckpoints.remove(checkpointId);
 						rememberRecentCheckpointId(checkpointId);
@@ -383,7 +392,9 @@ public class CheckpointCoordinator {
 			PendingCheckpoint p = entries.next().getValue();
 			if (p.getCheckpointTimestamp() < timestamp) {
 				rememberRecentCheckpointId(p.getCheckpointId());
-				p.discard();
+
+				p.discard(userClassLoader, true);
+
 				entries.remove();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index f25bff9..9ea3b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -37,7 +37,7 @@ import java.util.Map;
  * state handles always as serialized values, never as actual values.</p>
  */
 public class PendingCheckpoint {
-	
+		
 	private final Object lock = new Object();
 	
 	private final JobID jobId;
@@ -53,7 +53,7 @@ public class PendingCheckpoint {
 	private int numAcknowledgedTasks;
 	
 	private boolean discarded;
-
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
@@ -117,7 +117,8 @@ public class PendingCheckpoint {
 			if (notYetAcknowledgedTasks.isEmpty()) {
 				SuccessfulCheckpoint completed =  new SuccessfulCheckpoint(jobId, checkpointId,
 						checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
-				discard();
+				discard(null, false);
+				
 				return completed;
 			}
 			else {
@@ -148,11 +149,17 @@ public class PendingCheckpoint {
 	
 	/**
 	 * Discards the pending checkpoint, releasing all held resources.
+	 * @throws Exception 
 	 */
-	public void discard() {
+	public void discard(ClassLoader userClassLoader, boolean discardStateHandle) {
 		synchronized (lock) {
 			discarded = true;
 			numAcknowledgedTasks = -1;
+			if (discardStateHandle) {
+				for (StateForTask state : collectedStates) {
+					state.discard(userClassLoader);
+				}
+			}
 			collectedStates.clear();
 			notYetAcknowledgedTasks.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 26b3eb7..073b22f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.io.IOException;
+
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Simple bean to describe the state belonging to a parallel operator.
@@ -33,6 +37,8 @@ import org.apache.flink.runtime.util.SerializedValue;
  * the respective classloader.
  */
 public class StateForTask {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(StateForTask.class);
 
 	/** The state of the parallel operator */
 	private final SerializedValue<StateHandle<?>> state;
@@ -42,9 +48,9 @@ public class StateForTask {
 	
 	/** The index of the parallel subtask */
 	private final int subtask;
-
+	
 	public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
-		if (state == null || operatorId == null || subtask < 0) {
+	if (state == null || operatorId == null || subtask < 0) {
 			throw new IllegalArgumentException();
 		}
 		
@@ -66,6 +72,14 @@ public class StateForTask {
 	public int getSubtask() {
 		return subtask;
 	}
+	
+	public void discard(ClassLoader userClassLoader) {
+		try {
+			state.deserializeValue(userClassLoader).discardState();
+		} catch (Exception e) {
+			LOG.warn("Failed to discard checkpoint state: " + this, e);
+		}
+	}
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index cd7efba..3f77138 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
@@ -28,6 +30,8 @@ import java.util.List;
  */
 public class SuccessfulCheckpoint {
 	
+	private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class);
+	
 	private final JobID job;
 	
 	private final long checkpointID;
@@ -62,7 +66,10 @@ public class SuccessfulCheckpoint {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void dispose() {
+	public void dispose(ClassLoader userClassLoader) {
+		for(StateForTask state: states){
+			state.discard(userClassLoader);
+		}
 		states.clear();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a041f86..73e827f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -307,7 +307,7 @@ public class ExecutionGraph implements Serializable {
 		// create the coordinator that triggers and commits checkpoints and holds the state 
 		snapshotCheckpointsEnabled = true;
 		checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
-				checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo);
+				checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader);
 		
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index 36598d9..a202a83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -25,6 +25,12 @@ import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 
+/**
+ * Statehandle that writes/reads the contents of the serializable checkpointed
+ * state to the provided input and outputstreams using default java
+ * serialization.
+ * 
+ */
 public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
 
 	private static final long serialVersionUID = -962025800339325828L;
@@ -35,8 +41,14 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 		this.state = state;
 	}
 
+	/**
+	 * The state will be written to the stream returned by this method.
+	 */
 	protected abstract OutputStream getOutputStream() throws Exception;
 
+	/**
+	 * The state will be read from the stream returned by this method.
+	 */
 	protected abstract InputStream getInputStream() throws Exception;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 9d614d8..956bd9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -31,11 +31,18 @@ import org.apache.flink.util.StringUtils;
 
 import scala.util.Random;
 
+/**
+ * Statehandle that writes the checkpointed state to a random file in the
+ * provided checkpoint directory. Any Flink supported File system can be used
+ * but it is advised to use a filesystem that is persistent in case of node
+ * failures, such as HDFS or Tachyon.
+ * 
+ */
 public class FileStateHandle extends ByteStreamStateHandle {
 
 	private static final long serialVersionUID = 1L;
 
-	String pathString;
+	private String pathString;
 
 	public FileStateHandle(Serializable state, String folder) throws IOException {
 		super(state);
@@ -55,4 +62,10 @@ public class FileStateHandle extends ByteStreamStateHandle {
 		new Random().nextBytes(bytes);
 		return StringUtils.byteToHexString(bytes);
 	}
+
+	@Override
+	public void discardState() throws Exception {
+		FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index fa0c515..e8fe768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.state;
 import java.io.Serializable;
 
 /**
- * A StateHandle that includes a map of operator states directly.
+ * A StateHandle that includes the operator states directly.
  */
 public class LocalStateHandle implements StateHandle<Serializable> {
 
@@ -37,4 +37,8 @@ public class LocalStateHandle implements StateHandle<Serializable> {
 	public Serializable getState() {
 		return state;
 	}
+
+	@Override
+	public void discardState() throws Exception {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index 409383c..c1342b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -34,4 +34,11 @@ public interface StateHandle<T> extends Serializable {
 	 * @throws java.lang.Exception Thrown, if the state cannot be fetched.
 	 */
 	T getState() throws Exception;
+	
+	/**
+	 * Discards the state referred to by this handle, to free up resources in
+	 * the persistent storage. This method is called when the handle will not be
+	 * used any more.
+	 */
+	void discardState() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f3c40ba..8f613fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -53,10 +53,8 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.state.StateHandle;
-
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.runtime.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -522,7 +520,7 @@ public class Task implements Runnable {
 						StateUtils.setOperatorState(op, state);
 					}
 					catch (Exception e) {
-						throw new Exception("Failed to deserialize state handle and setup initial operator state");
+						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 10c8074..514b7c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,8 @@ import java.util.List;
  */
 public class CheckpointCoordinatorTest {
 	
+	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
 		try {
@@ -59,7 +61,7 @@ public class CheckpointCoordinatorTest {
 					jid, 1, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {} );
+					new ExecutionVertex[] {}, cl );
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -101,7 +103,7 @@ public class CheckpointCoordinatorTest {
 					jid, 1, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] {} );
+					new ExecutionVertex[] {}, cl );
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -139,7 +141,7 @@ public class CheckpointCoordinatorTest {
 					jid, 1, 600000,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
-					new ExecutionVertex[] { vertex1, vertex2 });
+					new ExecutionVertex[] { vertex1, vertex2 }, cl);
 			
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -282,7 +284,7 @@ public class CheckpointCoordinatorTest {
 					jid, 2, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex });
+					new ExecutionVertex[] { commitVertex }, cl);
 			
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -409,7 +411,7 @@ public class CheckpointCoordinatorTest {
 					jid, 10, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
-					new ExecutionVertex[] { commitVertex });
+					new ExecutionVertex[] { commitVertex }, cl);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -523,7 +525,7 @@ public class CheckpointCoordinatorTest {
 					jid, 2, 200,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex });
+					new ExecutionVertex[] { commitVertex }, cl);
 			
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp));
@@ -581,7 +583,7 @@ public class CheckpointCoordinatorTest {
 					jid, 2, 200000,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
-					new ExecutionVertex[] { commitVertex });
+					new ExecutionVertex[] { commitVertex }, cl);
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index e1cf061..c23aaca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,9 +30,7 @@ import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedValue;
-
 import org.junit.Test;
-
 import org.mockito.Mockito;
 
 import java.util.HashMap;
@@ -47,6 +45,8 @@ import static org.mockito.Mockito.*;
  */
 public class CheckpointStateRestoreTest {
 	
+	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	
 	@Test
 	public void testSetState() {
 		try {
@@ -82,7 +82,7 @@ public class CheckpointStateRestoreTest {
 			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L, 
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0]);
+					new ExecutionVertex[0], cl);
 			
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest {
 			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
-					new ExecutionVertex[0]);
+					new ExecutionVertex[0], cl);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -191,7 +191,7 @@ public class CheckpointStateRestoreTest {
 			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
-					new ExecutionVertex[0]);
+					new ExecutionVertex[0], cl);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 9a9f486..a204c3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -96,5 +96,9 @@ public class CheckpointMessagesTest {
 		public int hashCode() {
 			return getClass().hashCode();
 		}
-	};
+
+		@Override
+		public void discardState() throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59bee4ad/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a412e05..4d4893b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -41,7 +42,7 @@ import org.slf4j.LoggerFactory;
 
 
 public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
-		OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator {
+		OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointCommittingOperator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
@@ -136,7 +137,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	 * Re-injects the user states into the map. Also set the state on the functions.
 	 */
 	@Override
-	public void setInitialState(LocalStateHandle stateHandle) throws Exception {
+	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
 		// here, we later resolve the state handle into the actual state by
 		// loading the state described by the handle from the backup store
 		Serializable state = stateHandle.getState();
@@ -184,7 +185,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 					LOG.info("Starting checkpoint {} on task {}", checkpointId, getName());
 					
 					// first draw the state that should go into checkpoint
-					LocalStateHandle state;
+					StateHandle<Serializable> state;
 					try {
 
 						Serializable userState = null;


[4/6] flink git commit: [streaming] StateHandleProvider added for configurable state backend

Posted by gy...@apache.org.
[streaming] StateHandleProvider added for configurable state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/197cd6cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/197cd6cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/197cd6cf

Branch: refs/heads/master
Commit: 197cd6cf0fb10efc6badd5fc8584f4b36b09e705
Parents: 59bee4a
Author: Gyula Fora <gy...@apache.org>
Authored: Sat May 16 22:38:16 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:32:02 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  6 +--
 .../flink/runtime/checkpoint/StateForTask.java  |  2 -
 .../checkpoint/SuccessfulCheckpoint.java        |  2 +-
 .../runtime/state/ByteStreamStateHandle.java    |  3 +-
 .../flink/runtime/state/FileStateHandle.java    | 34 +++++++++++++-
 .../flink/runtime/state/LocalStateHandle.java   | 17 ++++++-
 .../runtime/state/StateHandleProvider.java      | 39 ++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 15 ++++++
 .../flink/streaming/api/graph/StreamConfig.java | 21 +++++++++
 .../flink/streaming/api/graph/StreamGraph.java  | 11 +++++
 .../api/graph/StreamingJobGraphGenerator.java   |  1 +
 .../streaming/runtime/tasks/StreamTask.java     |  7 ++-
 .../api/scala/StreamExecutionEnvironment.scala  | 15 ++++--
 .../ProcessFailureStreamingRecoveryITCase.java  | 49 ++++++++++++++------
 14 files changed, 192 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4abbf8..b52e732 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -176,7 +176,7 @@ public class CheckpointCoordinator {
 			
 			// clean and discard all successful checkpoints
 			for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
-				checkpoint.dispose(userClassLoader);
+				checkpoint.discard(userClassLoader);
 			}
 			completedCheckpoints.clear();
 		}
@@ -334,7 +334,7 @@ public class CheckpointCoordinator {
 						completed = checkpoint.toCompletedCheckpoint();
 						completedCheckpoints.addLast(completed);
 						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
-							completedCheckpoints.removeFirst().dispose(userClassLoader);;
+							completedCheckpoints.removeFirst().discard(userClassLoader);
 						}
 						pendingCheckpoints.remove(checkpointId);
 						rememberRecentCheckpointId(checkpointId);
@@ -409,7 +409,7 @@ public class CheckpointCoordinator {
 												boolean allOrNothingState) throws Exception {
 		synchronized (lock) {
 			if (shutdown) {
-				throw new IllegalStateException("CheckpointCoordinator is hut down");
+				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 			
 			if (completedCheckpoints.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 073b22f..73deeed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index 3f77138..be0b301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -66,7 +66,7 @@ public class SuccessfulCheckpoint {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void dispose(ClassLoader userClassLoader) {
+	public void discard(ClassLoader userClassLoader) {
 		for(StateForTask state: states){
 			state.discard(userClassLoader);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index a202a83..d7dbb84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -37,7 +36,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 
 	transient Serializable state;
 
-	public ByteStreamStateHandle(Serializable state) throws IOException {
+	public ByteStreamStateHandle(Serializable state) {
 		this.state = state;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 956bd9d..091c739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-import scala.util.Random;
+import java.util.Random;
 
 /**
  * Statehandle that writes the checkpointed state to a random file in the
@@ -44,7 +44,7 @@ public class FileStateHandle extends ByteStreamStateHandle {
 
 	private String pathString;
 
-	public FileStateHandle(Serializable state, String folder) throws IOException {
+	public FileStateHandle(Serializable state, String folder) {
 		super(state);
 		this.pathString = folder + "/" + randomString();
 	}
@@ -68,4 +68,34 @@ public class FileStateHandle extends ByteStreamStateHandle {
 		FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
 	}
 
+	/**
+	 * Creates a {@link StateHandleProvider} for creating
+	 * {@link FileStateHandle}s for a given checkpoint directory.
+	 * 
+	 */
+	public static StateHandleProvider<Serializable> createProvider(String checkpointDir) {
+		return new FileStateHandleProvider(checkpointDir);
+	}
+
+	/**
+	 * {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
+	 * given checkpoint directory.
+	 * 
+	 */
+	private static class FileStateHandleProvider implements StateHandleProvider<Serializable> {
+
+		private static final long serialVersionUID = 3496670017955260518L;
+		private String path;
+
+		public FileStateHandleProvider(String path) {
+			this.path = path;
+		}
+
+		@Override
+		public FileStateHandle createStateHandle(Serializable state) {
+			return new FileStateHandle(state, path);
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index e8fe768..a53d8da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
 public class LocalStateHandle implements StateHandle<Serializable> {
 
 	private static final long serialVersionUID = 2093619217898039610L;
-	
+
 	private final Serializable state;
 
 	public LocalStateHandle(Serializable state) {
@@ -41,4 +41,19 @@ public class LocalStateHandle implements StateHandle<Serializable> {
 	@Override
 	public void discardState() throws Exception {
 	}
+	
+	public static LocalStateHandleProvider createProvider(){
+		return new LocalStateHandleProvider();
+	}
+
+	private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> {
+
+		private static final long serialVersionUID = 4665419208932921425L;
+
+		@Override
+		public LocalStateHandle createStateHandle(Serializable state) {
+			return new LocalStateHandle(state);
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
new file mode 100644
index 0000000..bac490b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Stateful streaming operators use a StateHandleProvider to create new
+ * {@link StateHandle}s to store each checkpoint in a persistent storage layer.
+ */
+public interface StateHandleProvider<T> extends Serializable {
+
+	/**
+	 * Creates a new {@link StateHandle} instance that will be used to store the
+	 * state checkpoint. This method is called for each state checkpoint saved.
+	 * 
+	 * @param state
+	 *            State to be stored in the handle.
+	 * 
+	 */
+	public StateHandle<T> createStateHandle(T state);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 820cfed..02c8dad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,8 @@ import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
@@ -237,6 +239,19 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the {@link StateHandleProvider} used for storing operator state
+	 * checkpoints when checkpointing is enabled.
+	 * <p>
+	 * An example would be using a {@link FileStateHandle#createProvider(Path)}
+	 * to use any Flink supported file system as a state backend
+	 * 
+	 */
+	public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
+		streamGraph.setStateHandleProvider(provider);
+		return this;
+	}
+
+	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of
 	 * zero effectively disables fault tolerance. A value of {@code -1}
 	 * indicates that the system default value (as defined in the configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index a1047df..3b00000 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -57,6 +58,7 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
 
 	// DEFAULT VALUES
 	private static final long DEFAULT_TIMEOUT = 100;
@@ -377,6 +379,25 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
+	
+	public void setStateHandleProvider(StateHandleProvider<?> provider) {
+
+		try {
+			InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
+		try {
+			return (StateHandleProvider<R>) InstantiationUtil
+					.readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
+		}
+	}
 
 	public void setChainStart() {
 		config.setBoolean(IS_CHAINED_VERTEX, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 1ad1be0..aeba566 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -76,6 +78,7 @@ public class StreamGraph extends StreamingPlan {
 
 	private Map<Integer, StreamLoop> streamLoops;
 	protected Map<Integer, StreamLoop> vertexIDtoLoop;
+	private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
 
@@ -116,6 +119,14 @@ public class StreamGraph extends StreamingPlan {
 		this.checkpointingInterval = checkpointingInterval;
 	}
 
+	public void setStateHandleProvider(StateHandleProvider<?> provider) {
+		this.stateHandleProvider = provider;
+	}
+
+	public StateHandleProvider<?> getStateHandleProvider() {
+		return this.stateHandleProvider;
+	}
+
 	public long getCheckpointingInterval() {
 		return checkpointingInterval;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 799862a..ef5ffca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -258,6 +258,7 @@ public class StreamingJobGraphGenerator {
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
 		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+		config.setStateHandleProvider(streamGraph.getStateHandleProvider());
 
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4d4893b..828a9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,6 +61,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	protected StreamingRuntimeContext context;
 
 	protected ClassLoader userClassLoader;
+	
+	private StateHandleProvider<Serializable> stateHandleProvider;
 
 	private EventListener<TaskEvent> superstepListener;
 
@@ -74,6 +76,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.context = createRuntimeContext(getEnvironment().getTaskName());
+		this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
 
 		outputHandler = new OutputHandler<OUT>(this);
 
@@ -212,7 +215,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 									: null;
 						}
 						
-						state = userState == null ? null : new LocalStateHandle(userState);
+						state = userState == null ? null : stateHandleProvider.createStateHandle(userState);
 					}
 					catch (Exception e) {
 						throw new Exception("Error while drawing snapshot of the user state.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 686fc23..5999625 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,19 +19,17 @@
 package org.apache.flink.streaming.api.scala
 
 import scala.reflect.ClassTag
-
 import com.esotericsoftware.kryo.Serializer
 import org.apache.commons.lang.Validate
 import org.joda.time.Instant
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
-
 import scala.reflect.ClassTag
+import org.apache.flink.runtime.state.StateHandleProvider
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -125,7 +123,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.enableCheckpointing()
     this
   }
-  
+
+  /**
+   * Sets the given StateHandleProvider to be used for storing operator state
+   * checkpoints when checkpointing is enabled.
+   */
+  def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment = {
+    javaEnv.setStateHandleProvider(provider)
+    this
+  }
+ 
   /**
    * Disables operator chaining for streaming operators. Operator chaining
    * allows non-shuffle operations to be co-located in the same thread fully

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 627016c..fb4b2b7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,16 +18,11 @@
 
 package org.apache.flink.test.recovery;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -40,7 +35,17 @@ import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
 /**
  * Test for streaming program behaviour in case of TaskManager failure
@@ -62,11 +67,16 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 	@Override
 	public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
-
+		
 		final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
 												UUID.randomUUID().toString());
 
 		assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs());
+		
+		final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+				UUID.randomUUID().toString());
+
+		assertTrue("Cannot create directory for checkpoints", tempCheckpointDir.mkdirs());
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 									.createRemoteEnvironment("localhost", jobManagerPort);
@@ -74,6 +84,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		env.getConfig().disableSysoutLogging();
 		env.setNumberOfExecutionRetries(1);
 		env.enableCheckpointing(200);
+		env.setStateHandleProvider(FileStateHandle.createProvider(tempCheckpointDir.getAbsolutePath()));
 
 		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
 				// add a non-chained no-op map to test the chain state restore logic
@@ -125,12 +136,19 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 			// validate
 			fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput);
+			
+			// TODO: Figure out why this fails when ran with other tests
+			// Check whether checkpoints have been cleaned up properly
+			// assertDirectoryEmpty(tempCheckpointDir);
 		}
 		finally {
 			// clean up
 			if (tempTestOutput.exists()) {
 				FileUtils.deleteDirectory(tempTestOutput);
 			}
+			if (tempCheckpointDir.exists()) {
+				FileUtils.deleteDirectory(tempCheckpointDir);
+			}
 		}
 	}
 
@@ -155,7 +173,6 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void open(Configuration config) {
 			stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
 			congruence = getRuntimeContext().getIndexOfThisSubtask();
@@ -267,4 +284,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			}
 		}
 	}
+	
+	private static void assertDirectoryEmpty(File path){
+		File[] files = path.listFiles();
+		assertNotNull(files);
+		assertEquals("Checkpoint dir is not empty", 0, files.length);
+	}
 }


[3/6] flink git commit: [streaming] Added HDFS test for FileStateHandle

Posted by gy...@apache.org.
[streaming] Added HDFS test for FileStateHandle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6edf31a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6edf31a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6edf31a0

Branch: refs/heads/master
Commit: 6edf31a06f4e3462fa5d377d8ac844153d4171fd
Parents: 197cd6c
Author: Gyula Fora <gy...@apache.org>
Authored: Mon May 18 13:47:14 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:32:02 2015 +0200

----------------------------------------------------------------------
 .../runtime/state/ByteStreamStateHandle.java    |  18 ++-
 .../flink/tachyon/FileStateHandleTest.java      | 115 +++++++++++++++++++
 .../java/org/apache/flink/tachyon/HDFSTest.java |   4 +-
 3 files changed, 131 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index d7dbb84..257784a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -34,7 +34,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 
 	private static final long serialVersionUID = -962025800339325828L;
 
-	transient Serializable state;
+	private transient Serializable state;
 
 	public ByteStreamStateHandle(Serializable state) {
 		this.state = state;
@@ -52,9 +52,11 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 
 	@Override
 	public Serializable getState() throws Exception {
-		ObjectInputStream stream = new ObjectInputStream(getInputStream());
-		state = (Serializable) stream.readObject();
-		stream.close();
+		if (!stateFetched()) {
+			ObjectInputStream stream = new ObjectInputStream(getInputStream());
+			state = (Serializable) stream.readObject();
+			stream.close();
+		}
 		return state;
 	}
 
@@ -64,4 +66,12 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 		stream.close();
 		oos.defaultWriteObject();
 	}
+
+	/**
+	 * Checks whether the state has already been fetched from the remote
+	 * storage.
+	 */
+	public boolean stateFetched() {
+		return state != null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
new file mode 100644
index 0000000..82b5d35
--- /dev/null
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStateHandleTest {
+
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	private org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
+					+ hdfsCluster.getNameNodePort() + "/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
+			hdfs = hdPath.getFileSystem(hdConf);
+			hdfs.mkdirs(hdPath);
+
+		} catch (Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, true);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testFileStateHandle() throws Exception {
+
+		Serializable state = "state";
+
+		// Create a state handle provider for the hdfs directory
+		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
+				+ hdPath);
+
+		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+
+		assertTrue(handle.stateFetched());
+
+		// Serialize the handle so it writes the value to hdfs
+		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
+				handle);
+
+		// Deserialize the handle and verify that the state is not fetched yet
+		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
+				.deserializeValue(Thread.currentThread().getContextClassLoader());
+		assertFalse(deserializedHandle.stateFetched());
+
+		// Fetch the and compare with original
+		assertEquals(state, deserializedHandle.getState());
+
+		// Test whether discard removes the checkpoint file properly
+		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
+		handle.discardState();
+		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edf31a0/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index fd328ae..a761712 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -45,10 +45,10 @@ import java.io.StringWriter;
  */
 public class HDFSTest {
 
-	private String hdfsURI;
+	protected String hdfsURI;
 	private MiniDFSCluster hdfsCluster;
 	private org.apache.hadoop.fs.Path hdPath;
-	private org.apache.hadoop.fs.FileSystem hdfs;
+	protected org.apache.hadoop.fs.FileSystem hdfs;
 
 	@Before
 	public void createHDFS() {


[6/6] flink git commit: [streaming] State backend configurable from flink-conf.yaml

Posted by gy...@apache.org.
[streaming] State backend configurable from flink-conf.yaml

Closes #676


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14799738
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14799738
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14799738

Branch: refs/heads/master
Commit: 147997388580e25ebbeabe86af706a4d6fa1de39
Parents: 2d3e69a
Author: Gyula Fora <gy...@apache.org>
Authored: Tue May 19 12:17:28 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 22:21:14 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 15 ++++++
 flink-dist/src/main/resources/flink-conf.yaml   | 15 ++++++
 .../flink/streaming/api/graph/StreamGraph.java  |  3 +-
 .../streaming/runtime/tasks/StreamTask.java     | 52 +++++++++++++++++++-
 4 files changed, 82 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 7908002..6a94c3d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -402,6 +402,18 @@ public final class ConfigConstants {
 	 */
 	public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
 	
+	// ----------------------------- Streaming --------------------------------
+	
+	/**
+	 * State backend for checkpoints;
+	 */
+	public static final String STATE_BACKEND = "state.backend";
+	
+	/**
+	 * Directory for saving streaming checkpoints
+	 */
+	public static final String STATE_BACKEND_FS_DIR = "state.backend.fs.checkpointdir";
+	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
@@ -624,6 +636,9 @@ public final class ConfigConstants {
 
 	public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
 	
+	// ----------------------------- Streaming Values --------------------------
+	
+	public static String DEFAULT_STATE_BACKEND = "jobmanager";
 
 	// ----------------------------- LocalExecution ----------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 2f2eb22..f0c0d56 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -48,6 +48,21 @@ jobmanager.web.port: 8081
 webclient.port: 8080
 
 #==============================================================================
+# Streaming state checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if 
+# checkpointing is enabled. 
+#
+# Supported backends: jobmanager, filesystem
+
+state.backend: jobmanager
+
+# Directory for storing checkpoints in a flink supported filesystem
+#
+# state.backend.fs.checkpointdir: hdfs://checkpoints
+
+#==============================================================================
 # Advanced
 #==============================================================================
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index aeba566..593b476 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,7 +38,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -78,7 +77,7 @@ public class StreamGraph extends StreamingPlan {
 
 	private Map<Integer, StreamLoop> streamLoops;
 	protected Map<Integer, StreamLoop> vertexIDtoLoop;
-	private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
+	private StateHandleProvider<?> stateHandleProvider;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14799738/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 828a9a7..d678922 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,12 +24,16 @@ import java.util.List;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -76,7 +80,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.context = createRuntimeContext(getEnvironment().getTaskName());
-		this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
+		this.stateHandleProvider = getStateHandleProvider();
 
 		outputHandler = new OutputHandler<OUT>(this);
 
@@ -98,6 +102,52 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
 				getExecutionConfig());
 	}
+	
+	private StateHandleProvider<Serializable> getStateHandleProvider() {
+
+		StateHandleProvider<Serializable> provider = configuration
+				.getStateHandleProvider(userClassLoader);
+
+		// If the user did not specify a provider in the program we try to get it from the config
+		if (provider == null) {
+			String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
+					ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
+
+			StateBackend backend;
+
+			try {
+				backend = StateBackend.valueOf(backendName);
+			} catch (Exception e) {
+				throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+			}
+			
+			switch (backend) {
+				case JOBMANAGER:
+					LOG.info("State backend for state checkpoints is set to jobmanager.");
+					return LocalStateHandle.createProvider();
+				case FILESYSTEM:
+					String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
+					if (checkpointDir != null) {
+						LOG.info("State backend for state checkpoints is set to filesystem with directory: "
+								+ checkpointDir);
+						return FileStateHandle.createProvider(checkpointDir);
+					} else {
+						throw new RuntimeException(
+								"For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
+					}
+				default:
+					throw new RuntimeException("Backend " + backend + " is not supported yet.");
+			}
+			
+		} else {
+			LOG.info("Using user defined state backend for streaming checkpoitns.");
+			return provider;
+		}
+	}
+
+	private enum StateBackend {
+		JOBMANAGER, FILESYSTEM
+	}
 
 	protected void openOperator() throws Exception {
 		streamOperator.open(getTaskConfiguration());


[5/6] flink git commit: [FLINK-1986] [streaming] Iterative stream creation bugfix

Posted by gy...@apache.org.
[FLINK-1986] [streaming] Iterative stream creation bugfix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d3e69a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d3e69a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d3e69a2

Branch: refs/heads/master
Commit: 2d3e69a25e753b2eb825b589fe2550d2b968e161
Parents: 6edf31a
Author: Gyula Fora <gy...@apache.org>
Authored: Mon May 18 16:36:14 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 19 18:33:14 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 16 ++++++++
 .../api/datastream/IterativeDataStream.java     | 43 ++------------------
 2 files changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d3e69a2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 03cdaa5..dbb9b05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -113,6 +113,9 @@ public class DataStream<OUT> {
 	@SuppressWarnings("rawtypes")
 	protected TypeInformation typeInfo;
 	protected List<DataStream<OUT>> mergedStreams;
+	
+	protected Integer iterationID = null;
+	protected Long iterationWaitTime = null;
 
 	protected final StreamGraph streamGraph;
 	private boolean typeUsed;
@@ -160,6 +163,8 @@ public class DataStream<OUT> {
 		this.partitioner = dataStream.partitioner.copy();
 		this.streamGraph = dataStream.streamGraph;
 		this.typeInfo = dataStream.typeInfo;
+		this.iterationID = dataStream.iterationID;
+		this.iterationWaitTime = dataStream.iterationWaitTime;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 		if (dataStream.mergedStreams.size() > 1) {
@@ -1224,9 +1229,20 @@ public class DataStream<OUT> {
 				operatorName);
 
 		connectGraph(inputStream, returnStream.getId(), 0);
+		
+		if (iterationID != null) {
+			//This data stream is an input to some iteration
+			addIterationSource(returnStream);
+		}
 
 		return returnStream;
 	}
+	
+	private <X> void addIterationSource(DataStream<X> dataStream) {
+		Integer id = ++counter;
+		streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime);
+		streamGraph.setParallelism(id, dataStream.getParallelism());
+	}
 
 	/**
 	 * Internal function for setting the partitioner for the DataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/2d3e69a2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index b1bdb85..178f2eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.
@@ -31,21 +28,13 @@ public class IterativeDataStream<IN> extends
 		SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
 
 	static Integer iterationCount = 0;
-	protected Integer iterationID;
-	protected long waitTime;
-
+	
 	protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
 		super(dataStream);
 		setBufferTimeout(dataStream.environment.getBufferTimeout());
 		iterationID = iterationCount;
 		iterationCount++;
-		waitTime = maxWaitTime;
-	}
-
-	protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, long waitTime) {
-		super(dataStream);
-		this.iterationID = iterationID;
-		this.waitTime = waitTime;
+		iterationWaitTime = maxWaitTime;
 	}
 
 	/**
@@ -70,35 +59,9 @@ public class IterativeDataStream<IN> extends
 		// We add an iteration sink to the tail which will send tuples to the
 		// iteration head
 		streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
-				waitTime);
+				iterationWaitTime);
 
 		connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
 		return iterationTail;
 	}
-
-	@Override
-	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, OneInputStreamOperator<IN, R> operator) {
-
-		// We call the superclass tranform method
-		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,
-				operator);
-
-		// Then we add a source that will take care of receiving feedback tuples
-		// from the tail
-		addIterationSource(returnStream);
-
-		return returnStream;
-	}
-
-	private <X> void addIterationSource(DataStream<X> dataStream) {
-		Integer id = ++counter;
-		streamGraph.addIterationHead(id, dataStream.getId(), iterationID, waitTime);
-		streamGraph.setParallelism(id, dataStream.getParallelism());
-	}
-
-	@Override
-	public IterativeDataStream<IN> copy() {
-		return new IterativeDataStream<IN>(this, iterationID, waitTime);
-	}
 }