You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:20 UTC
[3/4] flink git commit: [FLINK-5163] Port the
MessageAcknowledgingSourceBase to the new state abstractions.
[FLINK-5163] Port the MessageAcknowledgingSourceBase to the new state abstractions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/956ffa69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/956ffa69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/956ffa69
Branch: refs/heads/master
Commit: 956ffa69a96ee74990c032c647a887feef4b2508
Parents: d24833d
Author: kl0u <kk...@gmail.com>
Authored: Fri Nov 18 16:07:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100
----------------------------------------------------------------------
.../flink-connector-rabbitmq/pom.xml | 16 ++++
.../connectors/rabbitmq/RMQSourceTest.java | 54 +++++++++++--
.../source/MessageAcknowledgingSourceBase.java | 82 +++++++++++++-------
...ltipleIdsMessageAcknowledgingSourceBase.java | 8 +-
4 files changed, 120 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml
index 0b69d66..2710e53 100644
--- a/flink-connectors/flink-connector-rabbitmq/pom.xml
+++ b/flink-connectors/flink-connector-rabbitmq/pom.xml
@@ -55,6 +55,22 @@ under the License.
<version>${rabbitmq.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b63c835..8474f8a 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -23,16 +23,19 @@ import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.After;
import org.junit.Before;
@@ -53,6 +56,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
/**
@@ -83,7 +87,13 @@ public class RMQSourceTest {
@Before
public void beforeTest() throws Exception {
+ OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
+ FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
+ Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
+ Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
+
source = new RMQTestSource();
+ source.initializeState(mockContext);
source.open(config);
messageId = 0;
@@ -128,6 +138,12 @@ public class RMQSourceTest {
@Test
public void testCheckpointing() throws Exception {
source.autoAck = false;
+
+ StreamSource<String, RMQSource<String>> src = new StreamSource<>(source);
+ AbstractStreamOperatorTestHarness<String> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+ testHarness.open();
+
sourceThread.start();
Thread.sleep(5);
@@ -141,10 +157,10 @@ public class RMQSourceTest {
for (int i=0; i < numSnapshots; i++) {
long snapshotId = random.nextLong();
- SerializedCheckpointData[] data;
+ OperatorStateHandles data;
synchronized (DummySourceContext.lock) {
- data = source.snapshotState(snapshotId, System.currentTimeMillis());
+ data = testHarness.snapshot(snapshotId, System.currentTimeMillis());
previousSnapshotId = lastSnapshotId;
lastSnapshotId = messageId;
}
@@ -153,15 +169,25 @@ public class RMQSourceTest {
// check if the correct number of messages have been snapshotted
final long numIds = lastSnapshotId - previousSnapshotId;
- assertEquals(numIds, data[0].getNumIds());
- // deserialize and check if the last id equals the last snapshotted id
- ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer());
+
+ RMQTestSource sourceCopy = new RMQTestSource();
+ StreamSource<String, RMQTestSource> srcCopy = new StreamSource<>(sourceCopy);
+ AbstractStreamOperatorTestHarness<String> testHarnessCopy =
+ new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+
+ testHarnessCopy.setup();
+ testHarnessCopy.initializeState(data);
+ testHarnessCopy.open();
+
+ ArrayDeque<Tuple2<Long, List<String>>> deque = sourceCopy.getRestoredState();
List<String> messageIds = deque.getLast().f1;
+
+ assertEquals(numIds, messageIds.size());
if (messageIds.size() > 0) {
assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1)));
}
- // check if the messages are being acknowledged and the transaction comitted
+ // check if the messages are being acknowledged and the transaction committed
synchronized (DummySourceContext.lock) {
source.notifyCheckpointComplete(snapshotId);
}
@@ -313,6 +339,8 @@ public class RMQSourceTest {
private class RMQTestSource extends RMQSource<String> {
+ private ArrayDeque<Tuple2<Long, List<String>>> restoredState;
+
public RMQTestSource() {
super(new RMQConnectionConfig.Builder().setHost("hostTest")
.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
@@ -320,6 +348,16 @@ public class RMQSourceTest {
}
@Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ super.initializeState(context);
+ this.restoredState = this.pendingCheckpoints;
+ }
+
+ public ArrayDeque<Tuple2<Long, List<String>>> getRestoredState() {
+ return this.restoredState;
+ }
+
+ @Override
public void open(Configuration config) throws Exception {
super.open(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 5c1b94e..035b7dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -27,14 +27,17 @@ import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,14 +76,16 @@ import org.slf4j.LoggerFactory;
* }
* }
* }</pre>
- *
+ *
+ * <b>NOTE:</b> This source has a parallelism of {@code 1}.
+ *
* @param <Type> The type of the messages created by the source.
* @param <UId> The type of unique IDs which may be used to acknowledge elements.
*/
@PublicEvolving
public abstract class MessageAcknowledgingSourceBase<Type, UId>
extends RichSourceFunction<Type>
- implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener {
+ implements CheckpointedFunction, CheckpointListener {
private static final long serialVersionUID = -8689291992192955579L;
@@ -93,7 +98,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
private transient List<UId> idsForCurrentCheckpoint;
/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
- private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
+ protected transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
/**
* Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
@@ -102,6 +107,8 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
*/
private transient Set<UId> idsProcessedButNotAcknowledged;
+ private transient ListState<SerializedCheckpointData[]> checkpointedState;
+
// ------------------------------------------------------------------------
/**
@@ -123,13 +130,38 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
}
@Override
- public void open(Configuration parameters) throws Exception {
- idsForCurrentCheckpoint = new ArrayList<>(64);
- if (pendingCheckpoints == null) {
- pendingCheckpoints = new ArrayDeque<>();
- }
- if (idsProcessedButNotAcknowledged == null) {
- idsProcessedButNotAcknowledged = new HashSet<>();
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ Preconditions.checkState(this.checkpointedState == null,
+ "The " + getClass().getSimpleName() + " has already been initialized.");
+
+ this.checkpointedState = context
+ .getOperatorStateStore()
+ .getSerializableListState("message-acknowledging-source-state");
+
+ this.idsForCurrentCheckpoint = new ArrayList<>(64);
+ this.pendingCheckpoints = new ArrayDeque<>();
+ this.idsProcessedButNotAcknowledged = new HashSet<>();
+
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+ List<SerializedCheckpointData[]> retrievedStates = new ArrayList<>();
+ for (SerializedCheckpointData[] entry : this.checkpointedState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only have at most 1 state
+ Preconditions.checkArgument(retrievedStates.size() == 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ pendingCheckpoints = SerializedCheckpointData.toDeque(retrievedStates.get(0), idSerializer);
+ // build a set which contains all processed ids. It may be used to check if we have
+ // already processed an incoming message.
+ for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
+ idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+ }
+ } else {
+ LOG.info("No state to restore for the {}.", getClass().getSimpleName());
}
}
@@ -166,26 +198,20 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
// ------------------------------------------------------------------------
@Override
- public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
- idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(this.checkpointedState != null,
+ "The " + getClass().getSimpleName() + " has not been properly initialized.");
- pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}",
+ idsForCurrentCheckpoint, context.getCheckpointId(), context.getCheckpointTimestamp());
+ }
+ pendingCheckpoints.addLast(new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new ArrayList<>(64);
- return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
- }
-
- @Override
- public void restoreState(SerializedCheckpointData[] state) throws Exception {
- idsProcessedButNotAcknowledged = new HashSet<>();
- pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
- // build a set which contains all processed ids. It may be used to check if we have
- // already processed an incoming message.
- for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
- idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
- }
+ this.checkpointedState.clear();
+ this.checkpointedState.add(SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/956ffa69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index 2237c1c..188fdce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,9 +133,9 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
@Override
- public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ sessionIdsPerSnapshot.add(new Tuple2<>(context.getCheckpointId(), sessionIds));
sessionIds = new ArrayList<>(64);
- return super.snapshotState(checkpointId, checkpointTimestamp);
+ super.snapshotState(context);
}
}