You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/11 22:19:36 UTC

[2/8] flink git commit: [streaming] New Source and state checkpointing interfaces that allow operations to interact with the state checkpointing in a more precise manner.

[streaming] New Source and state checkpointing interfaces that allow operations to interact with the state checkpointing in a more precise manner.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acca10ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acca10ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acca10ea

Branch: refs/heads/master
Commit: acca10ea0a94cac56fe53258fa145c777ad4ea67
Parents: d259e69
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Apr 30 22:05:27 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 11 21:07:57 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/Function.java    |  7 +-
 .../api/checkpoint/CheckpointCommitter.java     | 37 +++++++++
 .../streaming/api/checkpoint/Checkpointed.java  | 53 +++++++++++++
 .../checkpoint/CheckpointedAsynchronously.java  | 35 +++++++++
 .../api/functions/source/StreamSource.java      | 80 ++++++++++++++++++++
 5 files changed, 209 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acca10ea/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index 64ebac1..632a0d2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -19,9 +19,10 @@
 package org.apache.flink.api.common.functions;
 
 /**
- * An base interface for all user-defined functions. This interface is empty in order
- * to enable functions that are SAM (single abstract method) interfaces, so that they
- * can be called as Java 8 lambdas
+ * The base interface for all user-defined functions.
+ * 
+ * <p>This interface is empty in order to allow extending interfaces to
+ * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p>
  */
 public interface Function {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acca10ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
new file mode 100644
index 0000000..a95b540
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+/**
+ * This interface must be implemented by functions/operations that want to receive
+ * a commit notification once a checkpoint has been completely acknowledged by all
+ * participants.
+ */
+public interface CheckpointCommitter {
+
+	/**
+	 * This method is called as a notification once a distributed checkpoint has been completed.
+	 * 
+	 * Note that any exception during this method will not cause the checkpoint to
+	 * fail any more.
+	 * 
+	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 */
+	void commitCheckpoint(long checkpointId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/acca10ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
new file mode 100644
index 0000000..f491dd3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorState;
+
+/**
+ * This method must be implemented by functions that have state that needs to be
+ * checkpointed. The functions get a call whenever a checkpoint should take place
+ * and return a snapshot of their state, which will be checkpointed.
+ * 
+ * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
+ * state is written, the function is not called, so the function needs not return a
+ * copy of its state, but may return a reference to its state. Functions that can
+ * continue to work and mutate the state, even while the state snapshot is being accessed,
+ * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
+ * interface.</p>
+ */
+public interface Checkpointed {
+
+	/**
+	 * Gets the current operator state as a checkpoint. The state must reflect all operations
+	 * from all prior operations if this function. 
+	 * 
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
+	 *                            System.currentTimeMillis() on the JobManager.
+	 *                            
+	 * @return A snapshot of the operator state.
+	 * 
+	 * @throws Exception Thrown if the creation of the state object failed. This causes the
+	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
+	 *                   recovery), or to discard this checkpoint attempt and to continue running
+	 *                   and to try again with the next checkpoint attempt.
+	 */
+	OperatorState<?> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/acca10ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
new file mode 100644
index 0000000..196f7ec
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+/**
+ * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
+ * Similar to the {@link Checkpointed} interface, the function must produce a
+ * snapshot of its state. However, the function must be able to continue working
+ * and mutating its state without mutating the returned state snapshot.
+ * 
+ * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
+ * point of the checkpointed function/operator to continue running while the checkpoint
+ * is in progress.</p>
+ * 
+ * <p>To be able to support asynchronous snapshots, the state returned by the
+ * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
+ * of the actual state.</p>
+ */
+public interface CheckpointedAsynchronously extends Checkpointed {}

http://git-wip-us.apache.org/repos/asf/flink/blob/acca10ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java
new file mode 100644
index 0000000..9452930
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StreamSource.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Base interface for all stream data sources in Flink. The contract of a stream source
+ * is similar to an iterator - it is consumed as in the following pseudo code:
+ * 
+ * <pre>{@code
+ * StreamSource<T> source = ...;
+ * Collector<T> out = ...;
+ * while (!source.reachedEnd()) {
+ *   out.collect(source.next());
+ * }
+ * }
+ * </pre>
+ * 
+ * <b>Note about blocking behavior</b>
+ * <p>This implementations of the methods in the stream sources must have certain guarantees about
+ * blocking behavior. One of the two characteristics must be fulfilled.</p>
+ * <ul>
+ *     <li>The methods must react to thread interrupt calls and break out of blocking calls with
+ *         an {@link InterruptedException}.</li>
+ *     <li>The method may ignore interrupt calls and/or swallow InterruptedExceptions, if it is guaranteed
+ *         that the method returns quasi immediately irrespectively of the input. This is true for example
+ *         for file streams, where the call is guaranteed to return after a very short I/O delay in
+ *         the order of milliseconds.</li>
+ * </ul>
+ * 
+ * @param <T> The type of the records produced by this source.
+ */
+public interface StreamSource<T> {
+	
+	/**
+	 * Checks whether the stream has reached its end.
+	 *
+	 * <p>This method must obey the contract about blocking behavior declared in the
+	 * description of this class.</p>
+	 * 
+	 * @return True, if the end of the stream has been reached, false if more data is available.
+	 * 
+	 * @throws InterruptedException The calling thread may be interrupted to pull the function out of this
+	 *                              method during checkpoints.
+	 * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of
+	 *                   the streaming program, or triggers recovery, depending on the program setup.
+	 */
+	boolean reachedEnd() throws Exception;
+
+
+	/**
+	 * Produces the next record.
+	 * 
+	 * <p>This method must obey the contract about blocking behavior declared in the
+	 * description of this class.</p>
+	 * 
+	 * @return The next record produced by this stream source.
+	 * 
+	 * @throws InterruptedException The calling thread may be interrupted to pull the function out of this
+	 *                              method during checkpoints.
+	 * @throws Exception Any other exception that is thrown causes the source to fail and results in failure of
+	 *                   the streaming program, or triggers recovery, depending on the program setup.
+	 */
+	T next() throws Exception;
+}