You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/13 07:53:29 UTC
[5/6] flink git commit: [FLINK-8306] [kafka,
tests] Fix mock verifications on final method
[FLINK-8306] [kafka, tests] Fix mock verifications on final method
Previously, offset commit behavioural tests relied on verifying on
AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually
final, and could not be mocked.
This commit fixes that by implementing a proper mock AbstractFetcher,
which keeps track of the offset commits that go through.
This closes #5284.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bfc7de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bfc7de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bfc7de
Branch: refs/heads/release-1.4
Commit: c4bfc7de36201d7a144ae995931ffd3a079ed649
Parents: 968683f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jan 12 08:45:32 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jan 13 11:34:20 2018 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaConsumerBaseTest.java | 203 ++++++++++---------
.../kafka/internals/AbstractFetcherTest.java | 63 +-----
.../kafka/testutils/TestSourceContext.java | 87 ++++++++
3 files changed, 199 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4361c67..f091c08 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -48,7 +47,9 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Preconditions;
@@ -56,11 +57,13 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
+
+import javax.annotation.Nonnull;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,6 +71,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.collection.IsIn.isIn;
@@ -75,14 +79,11 @@ import static org.hamcrest.collection.IsMapContaining.hasKey;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
/**
* Tests for the {@link FlinkKafkaConsumerBase}.
@@ -129,7 +130,11 @@ public class FlinkKafkaConsumerBaseTest {
@Test
public void ignoreCheckpointWhenNotRunning() throws Exception {
@SuppressWarnings("unchecked")
- final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>();
+ final MockFetcher<String> fetcher = new MockFetcher<>();
+ final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
+ fetcher,
+ mock(AbstractPartitionDiscoverer.class),
+ false);
final TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
setupConsumer(consumer, false, listState, true, 0, 1);
@@ -139,6 +144,11 @@ public class FlinkKafkaConsumerBaseTest {
// no state should have been checkpointed
assertFalse(listState.get().iterator().hasNext());
+
+ // acknowledgement of the checkpoint should also not result in any offset commits
+ consumer.notifyCheckpointComplete(1L);
+ assertNull(fetcher.getAndClearLastCommittedOffsets());
+ assertEquals(0, fetcher.getCommitCount());
}
/**
@@ -265,10 +275,7 @@ public class FlinkKafkaConsumerBaseTest {
// --------------------------------------------------------------------
- final OneShotLatch runLatch = new OneShotLatch();
- final OneShotLatch stopLatch = new OneShotLatch();
- final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
- when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+ final MockFetcher<String> fetcher = new MockFetcher<>(state1, state2, state3);
final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
fetcher,
@@ -283,17 +290,11 @@ public class FlinkKafkaConsumerBaseTest {
final CheckedThread runThread = new CheckedThread() {
@Override
public void go() throws Exception {
- consumer.run(mock(SourceFunction.SourceContext.class));
- }
-
- @Override
- public void sync() throws Exception {
- stopLatch.trigger();
- super.sync();
+ consumer.run(new TestSourceContext<>());
}
};
runThread.start();
- runLatch.await();
+ fetcher.waitUntilRun();
assertEquals(0, consumer.getPendingOffsetsToCommit().size());
@@ -329,6 +330,8 @@ public class FlinkKafkaConsumerBaseTest {
consumer.notifyCheckpointComplete(138L);
assertEquals(1, consumer.getPendingOffsetsToCommit().size());
assertTrue(consumer.getPendingOffsetsToCommit().containsKey(140L));
+ assertEquals(state1, fetcher.getAndClearLastCommittedOffsets());
+ assertEquals(1, fetcher.getCommitCount());
// checkpoint 3
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -347,28 +350,15 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
assertEquals(0, consumer.getPendingOffsetsToCommit().size());
+ assertEquals(state3, fetcher.getAndClearLastCommittedOffsets());
+ assertEquals(2, fetcher.getCommitCount());
consumer.notifyCheckpointComplete(666); // invalid checkpoint
assertEquals(0, consumer.getPendingOffsetsToCommit().size());
+ assertNull(fetcher.getAndClearLastCommittedOffsets());
+ assertEquals(2, fetcher.getCommitCount());
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
- listState.clear();
- }
- assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, consumer.getPendingOffsetsToCommit().size());
-
- // commit only the second last
- consumer.notifyCheckpointComplete(598);
- assertEquals(1, consumer.getPendingOffsetsToCommit().size());
-
- // access invalid checkpoint
- consumer.notifyCheckpointComplete(590);
-
- // and the last
- consumer.notifyCheckpointComplete(599);
- assertEquals(0, consumer.getPendingOffsetsToCommit().size());
-
+ consumer.cancel();
runThread.sync();
}
@@ -393,10 +383,7 @@ public class FlinkKafkaConsumerBaseTest {
// --------------------------------------------------------------------
- final OneShotLatch runLatch = new OneShotLatch();
- final OneShotLatch stopLatch = new OneShotLatch();
- final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch);
- when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+ final MockFetcher<String> fetcher = new MockFetcher<>(state1, state2, state3);
final FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
fetcher,
@@ -412,17 +399,11 @@ public class FlinkKafkaConsumerBaseTest {
final CheckedThread runThread = new CheckedThread() {
@Override
public void go() throws Exception {
- consumer.run(mock(SourceFunction.SourceContext.class));
- }
-
- @Override
- public void sync() throws Exception {
- stopLatch.trigger();
- super.sync();
+ consumer.run(new TestSourceContext<>());
}
};
runThread.start();
- runLatch.await();
+ fetcher.waitUntilRun();
assertEquals(0, consumer.getPendingOffsetsToCommit().size());
@@ -454,7 +435,8 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+ assertEquals(0, fetcher.getCommitCount());
+ assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
// checkpoint 3
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -471,29 +453,15 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+ assertEquals(0, fetcher.getCommitCount());
+ assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
consumer.notifyCheckpointComplete(666); // invalid checkpoint
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
- listState.clear();
- }
- assertEquals(0, consumer.getPendingOffsetsToCommit().size()); // pending offsets to commit should not be updated
+ assertEquals(0, fetcher.getCommitCount());
+ assertNull(fetcher.getAndClearLastCommittedOffsets()); // no offsets should be committed
- // commit only the second last
- consumer.notifyCheckpointComplete(598);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
- // access invalid checkpoint
- consumer.notifyCheckpointComplete(590);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
-
- // and the last
- consumer.notifyCheckpointComplete(599);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
+ consumer.cancel();
+ runThread.sync();
}
@Test
@@ -734,26 +702,6 @@ public class FlinkKafkaConsumerBaseTest {
}
}
- /**
- * Returns a mock {@link AbstractFetcher}, with run / stop latches injected in
- * the {@link AbstractFetcher#runFetchLoop()} method.
- */
- private static <T> AbstractFetcher<T, ?> getRunnableMockFetcher(
- OneShotLatch runLatch,
- OneShotLatch stopLatch) throws Exception {
-
- @SuppressWarnings("unchecked")
- final AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
-
- Mockito.doAnswer(invocationOnMock -> {
- runLatch.trigger();
- stopLatch.await();
- return null;
- }).when(fetcher).runFetchLoop();
-
- return fetcher;
- }
-
@SuppressWarnings("unchecked")
private static <T, S> void setupConsumer(
FlinkKafkaConsumerBase<T> consumer,
@@ -769,6 +717,77 @@ public class FlinkKafkaConsumerBaseTest {
consumer.open(new Configuration());
}
+ private static class MockFetcher<T> extends AbstractFetcher<T, Object> {
+
+ private final OneShotLatch runLatch = new OneShotLatch();
+ private final OneShotLatch stopLatch = new OneShotLatch();
+
+ private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> stateSnapshotsToReturn = new ArrayDeque<>();
+
+ private Map<KafkaTopicPartition, Long> lastCommittedOffsets;
+ private int commitCount = 0;
+
+ @SafeVarargs
+ private MockFetcher(HashMap<KafkaTopicPartition, Long>... stateSnapshotsToReturn) throws Exception {
+ super(
+ new TestSourceContext<>(),
+ new HashMap<>(),
+ null,
+ null,
+ new TestProcessingTimeService(),
+ 0,
+ MockFetcher.class.getClassLoader(),
+ false);
+
+ this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn));
+ }
+
+ @Override
+ protected void doCommitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+ this.lastCommittedOffsets = offsets;
+ this.commitCount++;
+ commitCallback.onSuccess();
+ }
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ runLatch.trigger();
+ stopLatch.await();
+ }
+
+ @Override
+ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+ checkState(!stateSnapshotsToReturn.isEmpty());
+ return stateSnapshotsToReturn.poll();
+ }
+
+ @Override
+ protected Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel() {
+ stopLatch.trigger();
+ }
+
+ private void waitUntilRun() throws InterruptedException {
+ runLatch.await();
+ }
+
+ private Map<KafkaTopicPartition, Long> getAndClearLastCommittedOffsets() {
+ Map<KafkaTopicPartition, Long> offsets = this.lastCommittedOffsets;
+ this.lastCommittedOffsets = null;
+ return offsets;
+ }
+
+ private int getCommitCount() {
+ return commitCount;
+ }
+ }
+
private static class MockRuntimeContext extends StreamingRuntimeContext {
private final boolean isCheckpointingEnabled;
@@ -784,7 +803,7 @@ public class FlinkKafkaConsumerBaseTest {
super(
new MockStreamOperator(),
new MockEnvironment("mockTask", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
- Collections.<String, Accumulator<?, ?>>emptyMap());
+ Collections.emptyMap());
this.isCheckpointingEnabled = isCheckpointingEnabled;
this.numParallelSubtasks = numParallelSubtasks;
http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index e4a58dd..6fe1d6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
@@ -444,67 +444,6 @@ public class AbstractFetcherTest {
// ------------------------------------------------------------------------
- private static final class TestSourceContext<T> implements SourceContext<T> {
-
- private final Object checkpointLock = new Object();
- private final Object watermarkLock = new Object();
-
- private volatile StreamRecord<T> latestElement;
- private volatile Watermark currentWatermark;
-
- @Override
- public void collect(T element) {
- this.latestElement = new StreamRecord<>(element);
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- this.latestElement = new StreamRecord<>(element, timestamp);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- synchronized (watermarkLock) {
- currentWatermark = mark;
- watermarkLock.notifyAll();
- }
- }
-
- @Override
- public void markAsTemporarilyIdle() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getCheckpointLock() {
- return checkpointLock;
- }
-
- @Override
- public void close() {}
-
- public StreamRecord<T> getLatestElement() {
- return latestElement;
- }
-
- public boolean hasWatermark() {
- return currentWatermark != null;
- }
-
- public Watermark getLatestWatermark() throws InterruptedException {
- synchronized (watermarkLock) {
- while (currentWatermark == null) {
- watermarkLock.wait();
- }
- Watermark wm = currentWatermark;
- currentWatermark = null;
- return wm;
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
private volatile long maxTimestamp = Long.MIN_VALUE;
http://git-wip-us.apache.org/repos/asf/flink/blob/c4bfc7de/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
new file mode 100644
index 0000000..2a96a68
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestSourceContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Test {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
+ */
+public final class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
+
+ private final Object checkpointLock = new Object();
+ private final Object watermarkLock = new Object();
+
+ private volatile StreamRecord<T> latestElement;
+ private volatile Watermark currentWatermark;
+
+ @Override
+ public void collect(T element) {
+ this.latestElement = new StreamRecord<>(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ this.latestElement = new StreamRecord<>(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ synchronized (watermarkLock) {
+ currentWatermark = mark;
+ watermarkLock.notifyAll();
+ }
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ // do nothing
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ public StreamRecord<T> getLatestElement() {
+ return latestElement;
+ }
+
+ public boolean hasWatermark() {
+ return currentWatermark != null;
+ }
+
+ public Watermark getLatestWatermark() throws InterruptedException {
+ synchronized (watermarkLock) {
+ while (currentWatermark == null) {
+ watermarkLock.wait();
+ }
+ Watermark wm = currentWatermark;
+ currentWatermark = null;
+ return wm;
+ }
+ }
+}