You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:49:05 UTC

[7/7] flink git commit: [FLINK-2704] [streaming] Clean up naming of State/Checkpoint Interfaces

[FLINK-2704] [streaming] Clean up naming of State/Checkpoint Interfaces

This closes #671

The interfaces are used on StreamTask (for now) but were called
*Operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05710948
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05710948
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05710948

Branch: refs/heads/master
Commit: 0571094885da2766dfb52a6fb38020cab7602114
Parents: a74fa8c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 10 15:45:53 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:04:18 2015 +0200

----------------------------------------------------------------------
 .../tasks/CheckpointNotificationOperator.java   | 35 ------------
 .../jobgraph/tasks/CheckpointedOperator.java    | 38 -------------
 .../jobgraph/tasks/OperatorStateCarrier.java    | 38 -------------
 .../runtime/jobgraph/tasks/StatefulTask.java    | 57 ++++++++++++++++++++
 .../apache/flink/runtime/state/StateUtils.java  |  6 +--
 .../apache/flink/runtime/taskmanager/Task.java  | 22 ++++----
 .../runtime/taskmanager/TaskAsyncCallTest.java  | 13 +++--
 .../streaming/runtime/tasks/StreamTask.java     |  7 +--
 8 files changed, 81 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
deleted file mode 100644
index 0eb9e07..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-/**
- * This interface needs to be implemented by runtime tasks that want to be able to receive
- * notifications about completed checkpoints.
- */
-public interface CheckpointNotificationOperator {
-
-	/**
-	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
-	 * the notification from all participating tasks.
-	 * 
-	 * @param checkpointId The ID of the checkpoint that is complete..
-	 * @throws Exception The notification method may forward its exceptions.
-	 */
-	void notifyCheckpointComplete(long checkpointId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
deleted file mode 100644
index 60f70dc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-/**
- * This interface must be implemented by invokable operators (subclasses
- * of {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} that
- * participate in state checkpoints.
- */
-public interface CheckpointedOperator {
-
-	/**
-	 * This method is either called directly and asynchronously by the checkpoint
-	 * coordinator (in the case of functions that are directly notified - usually
-	 * the data sources), or called synchronously when all incoming channels have
-	 * reported a checkpoint barrier.
-	 * 
-	 * @param checkpointId The ID of the checkpoint, incrementing.
-	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
-	 */
-	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
deleted file mode 100644
index 5045ca4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * This interface must be implemented by any invokable that has recoverable state.
- * The method {@link #setInitialState(org.apache.flink.runtime.state.StateHandle)} is used
- * to set the initial state of the operator, upon recovery.
- */
-public interface OperatorStateCarrier<T extends StateHandle<?>> {
-
-	/**
-	 * Sets the initial state of the operator, upon recovery. The initial state is typically
-	 * a snapshot of the state from a previous execution.
-	 * 
-	 * @param stateHandle The handle to the state.
-	 */
-	void setInitialState(T stateHandle) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
new file mode 100644
index 0000000..894e6d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * This interface must be implemented by any invokable that has recoverable state and participates
+ * in checkpointing.
+ */
+public interface StatefulTask<T extends StateHandle<?>> {
+
+	/**
+	 * Sets the initial state of the operator, upon recovery. The initial state is typically
+	 * a snapshot of the state from a previous execution.
+	 * 
+	 * @param stateHandle The handle to the state.
+	 */
+	void setInitialState(T stateHandle) throws Exception;
+
+	/**
+	 * This method is either called directly and asynchronously by the checkpoint
+	 * coordinator (in the case of functions that are directly notified - usually
+	 * the data sources), or called synchronously when all incoming channels have
+	 * reported a checkpoint barrier.
+	 *
+	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 */
+	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+
+
+	/**
+	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
+	 * the notification from all participating tasks.
+	 *
+	 * @param checkpointId The ID of the checkpoint that is complete..
+	 * @throws Exception The notification method may forward its exceptions.
+	 */
+	void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 30daec1..88b0d18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 /**
  * A collection of utility methods for dealing with operator state.
@@ -41,10 +41,10 @@ public class StateUtils {
 	 * @param <T>
 	 *            Type bound for the
 	 */
-	public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op,
+	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,
 			StateHandle<?> state) throws Exception {
 		@SuppressWarnings("unchecked")
-		OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
+		StatefulTask<T> typedOp = (StatefulTask<T>) op;
 		@SuppressWarnings("unchecked")
 		T typedHandle = (T) state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 0d1dd13..7eff45a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,9 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
@@ -534,10 +532,10 @@ public class Task implements Runnable {
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
 
 			if (operatorState != null) {
-				if (invokable instanceof OperatorStateCarrier) {
+				if (invokable instanceof StatefulTask) {
 					try {
 						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
-						OperatorStateCarrier<?> op = (OperatorStateCarrier<?>) invokable;
+						StatefulTask<?> op = (StatefulTask<?>) invokable;
 						StateUtils.setOperatorState(op, state);
 					}
 					catch (Exception e) {
@@ -869,7 +867,7 @@ public class Task implements Runnable {
 
 	/**
 	 * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
-	 * {@link org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator}.
+	 * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.   
@@ -878,10 +876,10 @@ public class Task implements Runnable {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof CheckpointedOperator) {
+			if (invokable instanceof StatefulTask) {
 
 				// build a local closure 
-				final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable;
+				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
@@ -889,7 +887,7 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
+							statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
 						}
 						catch (Throwable t) {
 							failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
@@ -912,10 +910,10 @@ public class Task implements Runnable {
 		AbstractInvokable invokable = this.invokable;
 
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
-			if (invokable instanceof CheckpointNotificationOperator) {
+			if (invokable instanceof StatefulTask) {
 
 				// build a local closure 
-				final CheckpointNotificationOperator checkpointer = (CheckpointNotificationOperator) invokable;
+				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
 				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
@@ -923,7 +921,7 @@ public class Task implements Runnable {
 					@Override
 					public void run() {
 						try {
-							checkpointer.notifyCheckpointComplete(checkpointID);
+							statefulTask.notifyCheckpointComplete(checkpointID);
 						}
 						catch (Throwable t) {
 							// fail task if checkpoint confirmation failed.

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7857123..c0fe750 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -37,15 +37,16 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.junit.Before;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -168,8 +169,7 @@ public class TaskAsyncCallTest {
 				new TaskManagerRuntimeInfo("localhost", new Configuration()));
 	}
 	
-	public static class CheckpointsInOrderInvokable extends AbstractInvokable
-			implements CheckpointedOperator, CheckpointNotificationOperator {
+	public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
 		private volatile long lastCheckpointId = 0;
 		
@@ -196,6 +196,11 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
+
+		}
+
+		@Override
 		public void triggerCheckpoint(long checkpointId, long timestamp) {
 			lastCheckpointId++;
 			if (checkpointId == lastCheckpointId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/05710948/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d5bdce2..d357a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -33,9 +33,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -74,8 +72,7 @@ import org.slf4j.LoggerFactory;
  * @param <OUT>
  * @param <O>
  */
-public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
-		OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointNotificationOperator {
+public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);