You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:18 UTC

[03/15] flink git commit: [FLINK-5292] Add CheckpointedRestoringOperator interface.

[FLINK-5292] Add CheckpointedRestoringOperator interface.

This breaks the StreamCheckpointedOperator interface into the
checkpointing and restoring part. The restoring part is meant for
operators that need to restore from legacy snapshots done using Flink
1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32f300fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32f300fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32f300fd

Branch: refs/heads/master
Commit: 32f300fd2c95e3cd94c7edb52665f90c0fa281ed
Parents: bfdaa38
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 18:39:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:53 2016 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |  5 +--
 .../CheckpointedRestoringOperator.java          | 46 ++++++++++++++++++++
 .../operators/StreamCheckpointedOperator.java   | 22 +---------
 3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 1c27293..a21660c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -141,7 +141,6 @@ public abstract class AbstractStreamOperator<OUT>
 	// ---------------- timers ------------------
 
 	private transient Map<String, HeapInternalTimerService<?, ?>> timerServices;
-//	private transient Map<String, HeapInternalTimerService<?, ?>> restoredServices;
 
 
 	// ---------------- two-input operator watermarks ------------------
@@ -230,7 +229,7 @@ public abstract class AbstractStreamOperator<OUT>
 	private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
 		StreamStateHandle state = stateHandles.getLegacyOperatorState();
 		if (null != state) {
-			if (this instanceof StreamCheckpointedOperator) {
+			if (this instanceof CheckpointedRestoringOperator) {
 
 				LOG.debug("Restore state of task {} in chain ({}).",
 						stateHandles.getOperatorChainIndex(), getContainingTask().getName());
@@ -238,7 +237,7 @@ public abstract class AbstractStreamOperator<OUT>
 				FSDataInputStream is = state.openInputStream();
 				try {
 					getContainingTask().getCancelables().registerClosable(is);
-					((StreamCheckpointedOperator) this).restoreState(is);
+					((CheckpointedRestoringOperator) this).restoreState(is);
 				} finally {
 					getContainingTask().getCancelables().unregisterClosable(is);
 					is.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
new file mode 100644
index 0000000..20eb1cf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Interface for {@link StreamOperator StreamOperators} that can restore from a Flink 1.1
+ * legacy snapshot that was done using the {@link StreamCheckpointedOperator} interface.
+ */
+@Deprecated
+public interface CheckpointedRestoringOperator {
+
+	/**
+	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
+	 * This method restores the operator state (if the operator is stateful) and the key/value state
+	 * (if it had been used and was initialized when the snapshot occurred).
+	 *
+	 * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * and before {@link StreamOperator#open()}.
+	 *
+	 * @param in The stream from which we have to restore our state.
+	 *
+	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
+	 *                   properly react to failed state restore and fail the execution attempt.
+	 */
+	void restoreState(FSDataInputStream in) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index d2f7e0d..f93e7ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 @Deprecated
-public interface StreamCheckpointedOperator {
+public interface StreamCheckpointedOperator extends CheckpointedRestoringOperator {
 
 	/**
 	 * Called to draw a state snapshot from the operator. This method snapshots the operator state
@@ -39,19 +36,4 @@ public interface StreamCheckpointedOperator {
 	 */
 	void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception;
 
-	/**
-	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
-	 * This method restores the operator state (if the operator is stateful) and the key/value state
-	 * (if it had been used and was initialized when the snapshot occurred).
-	 *
-	 * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
-	 * and before {@link StreamOperator#open()}.
-	 *
-	 * @param in The stream from which we have to restore our state.
-	 *
-	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
-	 *                   properly react to failed state restore and fail the execution attempt.
-	 */
-	void restoreState(FSDataInputStream in) throws Exception;
-
-}
\ No newline at end of file
+}