You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:20 UTC

[3/4] flink git commit: [FLINK-5163] Port the MessageAcknowledgingSourceBase to the new state abstractions.

[FLINK-5163] Port the MessageAcknowledgingSourceBase to the new state abstractions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/956ffa69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/956ffa69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/956ffa69

Branch: refs/heads/master
Commit: 956ffa69a96ee74990c032c647a887feef4b2508
Parents: d24833d
Author: kl0u <kk...@gmail.com>
Authored: Fri Nov 18 16:07:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-rabbitmq/pom.xml            | 16 ++++
 .../connectors/rabbitmq/RMQSourceTest.java      | 54 +++++++++++--
 .../source/MessageAcknowledgingSourceBase.java  | 82 +++++++++++++-------
 ...ltipleIdsMessageAcknowledgingSourceBase.java |  8 +-
 4 files changed, 120 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index 0b69d66..2710e53 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -55,6 +55,22 @@ under the License.
 			<version>${rabbitmq.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b63c835..8474f8a 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -23,16 +23,19 @@ import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
 import org.junit.Before;
@@ -53,6 +56,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 
 
 /**
@@ -83,7 +87,13 @@ public class RMQSourceTest {
 	@Before
 	public void beforeTest() throws Exception {
 
+		OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
+		FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
+		Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
+		Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
+
 		source = new RMQTestSource();
+		source.initializeState(mockContext);
 		source.open(config);
 
 		messageId = 0;
@@ -128,6 +138,12 @@ public class RMQSourceTest {
 	@Test
 	public void testCheckpointing() throws Exception {
 		source.autoAck = false;
+
+		StreamSource<String, RMQSource<String>> src = new StreamSource<>(source);
+		AbstractStreamOperatorTestHarness<String> testHarness =
+			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		testHarness.open();
+
 		sourceThread.start();
 
 		Thread.sleep(5);
@@ -141,10 +157,10 @@ public class RMQSourceTest {
 
 		for (int i=0; i < numSnapshots; i++) {
 			long snapshotId = random.nextLong();
-			SerializedCheckpointData[] data;
+			OperatorStateHandles data;
 
 			synchronized (DummySourceContext.lock) {
-				data = source.snapshotState(snapshotId, System.currentTimeMillis());
+				data = testHarness.snapshot(snapshotId, System.currentTimeMillis());
 				previousSnapshotId = lastSnapshotId;
 				lastSnapshotId = messageId;
 			}
@@ -153,15 +169,25 @@ public class RMQSourceTest {
 
 			// check if the correct number of messages have been snapshotted
 			final long numIds = lastSnapshotId - previousSnapshotId;
-			assertEquals(numIds, data[0].getNumIds());
-			// deserialize and check if the last id equals the last snapshotted id
-			ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer());
+
+			RMQTestSource sourceCopy = new RMQTestSource();
+			StreamSource<String, RMQTestSource> srcCopy = new StreamSource<>(sourceCopy);
+			AbstractStreamOperatorTestHarness<String> testHarnessCopy =
+				new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+
+			testHarnessCopy.setup();
+			testHarnessCopy.initializeState(data);
+			testHarnessCopy.open();
+
+			ArrayDeque<Tuple2<Long, List<String>>> deque = sourceCopy.getRestoredState();
 			List<String> messageIds = deque.getLast().f1;
+
+			assertEquals(numIds, messageIds.size());
 			if (messageIds.size() > 0) {
 				assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
 			}
 
-			// check if the messages are being acknowledged and the transaction comitted
+			// check if the messages are being acknowledged and the transaction committed
 			synchronized (DummySourceContext.lock) {
 				source.notifyCheckpointComplete(snapshotId);
 			}
@@ -313,6 +339,8 @@ public class RMQSourceTest {
 
 	private class RMQTestSource extends RMQSource<String> {
 
+		private ArrayDeque<Tuple2<Long, List<String>>> restoredState;
+
 		public RMQTestSource() {
 			super(new RMQConnectionConfig.Builder().setHost("hostTest")
 					.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
@@ -320,6 +348,16 @@ public class RMQSourceTest {
 		}
 
 		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			super.initializeState(context);
+			this.restoredState = this.pendingCheckpoints;
+		}
+
+		public ArrayDeque<Tuple2<Long, List<String>>> getRestoredState() {
+			return this.restoredState;
+		}
+
+		@Override
 		public void open(Configuration config) throws Exception {
 			super.open(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 5c1b94e..035b7dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -27,14 +27,17 @@ import java.util.Set;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,14 +76,16 @@ import org.slf4j.LoggerFactory;
  *     }
  * }
  * }</pre>
- * 
+ *
+ * <b>NOTE:</b> This source has a parallelism of {@code 1}.
+ *
  * @param <Type> The type of the messages created by the source.
  * @param <UId> The type of unique IDs which may be used to acknowledge elements.
  */
 @PublicEvolving
 public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	extends RichSourceFunction<Type>
-	implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener {
+	implements CheckpointedFunction, CheckpointListener {
 
 	private static final long serialVersionUID = -8689291992192955579L;
 
@@ -93,7 +98,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	private transient List<UId> idsForCurrentCheckpoint;
 
 	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
-	private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
+	protected transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
 
 	/**
 	 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
@@ -102,6 +107,8 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	 */
 	private transient Set<UId> idsProcessedButNotAcknowledged;
 
+	private transient ListState<SerializedCheckpointData[]> checkpointedState;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -123,13 +130,38 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		idsForCurrentCheckpoint = new ArrayList<>(64);
-		if (pendingCheckpoints == null) {
-			pendingCheckpoints = new ArrayDeque<>();
-		}
-		if (idsProcessedButNotAcknowledged == null) {
-			idsProcessedButNotAcknowledged = new HashSet<>();
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState == null,
+			"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context
+			.getOperatorStateStore()
+			.getSerializableListState("message-acknowledging-source-state");
+
+		this.idsForCurrentCheckpoint = new ArrayList<>(64);
+		this.pendingCheckpoints = new ArrayDeque<>();
+		this.idsProcessedButNotAcknowledged = new HashSet<>();
+
+		if (context.isRestored()) {
+			LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+			List<SerializedCheckpointData[]> retrievedStates = new ArrayList<>();
+			for (SerializedCheckpointData[] entry : this.checkpointedState.get()) {
+				retrievedStates.add(entry);
+			}
+
+			// given that the parallelism of the function is 1, we can only have at most 1 state
+			Preconditions.checkArgument(retrievedStates.size() == 1,
+				getClass().getSimpleName() + " retrieved invalid state.");
+
+			pendingCheckpoints = SerializedCheckpointData.toDeque(retrievedStates.get(0), idSerializer);
+			// build a set which contains all processed ids. It may be used to check if we have
+			// already processed an incoming message.
+			for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
+				idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+			}
+		} else {
+			LOG.info("No state to restore for the {}.", getClass().getSimpleName());
 		}
 	}
 
@@ -166,26 +198,20 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	// ------------------------------------------------------------------------
 
 	@Override
-	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
-					idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState != null,
+			"The " + getClass().getSimpleName() + " has not been properly initialized.");
 
-		pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}",
+				idsForCurrentCheckpoint, context.getCheckpointId(), context.getCheckpointTimestamp());
+		}
 
+		pendingCheckpoints.addLast(new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
 		idsForCurrentCheckpoint = new ArrayList<>(64);
 
-		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
-	}
-
-	@Override
-	public void restoreState(SerializedCheckpointData[] state) throws Exception {
-		idsProcessedButNotAcknowledged = new HashSet<>();
-		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
-		// build a set which contains all processed ids. It may be used to check if we have
-		// already processed an incoming message.
-		for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
-			idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
-		}
+		this.checkpointedState.clear();
+		this.checkpointedState.add(SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index 2237c1c..188fdce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,9 +133,9 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
 
 
 	@Override
-	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		sessionIdsPerSnapshot.add(new Tuple2<>(context.getCheckpointId(), sessionIds));
 		sessionIds = new ArrayList<>(64);
-		return super.snapshotState(checkpointId, checkpointTimestamp);
+		super.snapshotState(context);
 	}
 }