You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/23 15:28:51 UTC
flink git commit: [hotfix] [kafka] Indent Kafka010FetcherTest with
tabs instead of spaces
Repository: flink
Updated Branches:
refs/heads/master 8dac43613 -> de2605ea7
[hotfix] [kafka] Indent Kafka010FetcherTest with tabs instead of spaces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2605ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2605ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2605ea
Branch: refs/heads/master
Commit: de2605ea7b17fc569890a53743783b7d26c8e56b
Parents: 8dac436
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Feb 23 23:13:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Feb 23 23:13:03 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/Kafka010FetcherTest.java | 688 +++++++++----------
1 file changed, 343 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/de2605ea/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 5718986..98aa28a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -78,51 +78,51 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
@PrepareForTest(KafkaConsumerThread.class)
public class Kafka010FetcherTest {
- @Test
- public void testCommitDoesNotBlock() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
- final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
- testCommitData.put(testPartition, 11L);
-
- // to synchronize when the consumer is in its blocking method
- final OneShotLatch sync = new OneShotLatch();
-
- // ----- the mock consumer with blocking poll calls ----
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- sync.trigger();
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
+ @Test
+ public void testCommitDoesNotBlock() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ sync.trigger();
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
topics,
null, /* no restored state */
null, /* periodic assigner */
@@ -139,128 +139,128 @@ public class Kafka010FetcherTest {
StartupMode.GROUP_OFFSETS,
false);
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the fetcher has reached the method of interest
- sync.await();
-
- // ----- trigger the offset commit -----
-
- final AtomicReference<Throwable> commitError = new AtomicReference<>();
- final Thread committer = new Thread("committer runner") {
- @Override
- public void run() {
- try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
- } catch (Throwable t) {
- commitError.set(t);
- }
- }
- };
- committer.start();
-
- // ----- ensure that the committer finishes in time -----
- committer.join(30000);
- assertFalse("The committer did not finish in time", committer.isAlive());
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable fetcherError = error.get();
- if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", fetcherError);
- }
- final Throwable committerError = commitError.get();
- if (committerError != null) {
- throw new Exception("Exception in the committer", committerError);
- }
- }
-
- @Test
- public void ensureOffsetsGetCommitted() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
- final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
- final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
- testCommitData1.put(testPartition1, 11L);
- testCommitData1.put(testPartition2, 18L);
-
- final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
- testCommitData2.put(testPartition1, 19L);
- testCommitData2.put(testPartition2, 28L);
-
- final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-
- // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- @SuppressWarnings("unchecked")
- Map<TopicPartition, OffsetAndMetadata> offsets =
- (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
- OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
- commitStore.add(offsets);
- callback.onComplete(offsets, null);
-
- return null;
- }
- }).when(mockConsumer).commitAsync(
- Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
+
+ final AtomicReference<Throwable> commitError = new AtomicReference<>();
+ final Thread committer = new Thread("committer runner") {
+ @Override
+ public void run() {
+ try {
+ fetcher.commitInternalOffsetsToKafka(testCommitData);
+ } catch (Throwable t) {
+ commitError.set(t);
+ }
+ }
+ };
+ committer.start();
+
+ // ----- ensure that the committer finishes in time -----
+ committer.join(30000);
+ assertFalse("The committer did not finish in time", committer.isAlive());
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable fetcherError = error.get();
+ if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", fetcherError);
+ }
+
+ final Throwable committerError = commitError.get();
+ if (committerError != null) {
+ throw new Exception("Exception in the committer", committerError);
+ }
+ }
+
+ @Test
+ public void ensureOffsetsGetCommitted() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+ final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+ final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+ testCommitData1.put(testPartition1, 11L);
+ testCommitData1.put(testPartition2, 18L);
+
+ final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+ testCommitData2.put(testPartition1, 19L);
+ testCommitData2.put(testPartition2, 28L);
+
+ final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+ // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+ OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+ commitStore.add(offsets);
+ callback.onComplete(offsets, null);
+
+ return null;
+ }
+ }).when(mockConsumer).commitAsync(
+ Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
topics,
null, /* no restored state */
null, /* periodic assigner */
@@ -277,106 +277,105 @@ public class Kafka010FetcherTest {
StartupMode.GROUP_OFFSETS,
false);
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // ----- trigger the first offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(12L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(18L, entry.getValue().offset());
+ }
+ }
+
+ // ----- trigger the second offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(20L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(28L, entry.getValue().offset());
+ }
+ }
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+
+ @Test
+ public void testCancellationWhenEmitBlocks() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // ----- trigger the first offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
- Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(12L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(18L, entry.getValue().offset());
- }
- }
-
- // ----- trigger the second offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
- Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(20L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(28L, entry.getValue().offset());
- }
- }
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable caughtError = error.get();
- if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", caughtError);
- }
- }
-
- @Test
- public void testCancellationWhenEmitBlocks() throws Exception {
-
- // ----- some test data -----
-
- final String topic = "test-topic";
- final int partition = 3;
- final byte[] payload = new byte[] {1, 2, 3, 4};
-
- final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
- data.put(new TopicPartition(topic, partition), records);
-
- final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
- // ----- the test consumer -----
-
- final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
- return consumerRecords;
- }
- });
-
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- build a fetcher -----
-
- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+ return consumerRecords;
+ }
+ });
+
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
topics,
null, /* no restored state */
@@ -394,58 +393,57 @@ public class Kafka010FetcherTest {
StartupMode.GROUP_OFFSETS,
false);
+ // ----- run the fetcher -----
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
- // wait until the thread started to emit records to the source context
- sourceContext.waitTillHasBlocker();
+ // wait until the thread started to emit records to the source context
+ sourceContext.waitTillHasBlocker();
- // now we try to cancel the fetcher, including the interruption usually done on the task thread
- // once it has finished, there must be no more thread blocked on the source context
- fetcher.cancel();
- fetcherRunner.interrupt();
- fetcherRunner.join();
+ // now we try to cancel the fetcher, including the interruption usually done on the task thread
+ // once it has finished, there must be no more thread blocked on the source context
+ fetcher.cancel();
+ fetcherRunner.interrupt();
+ fetcherRunner.join();
- assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
- }
+ assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+ }
- // ------------------------------------------------------------------------
- // test utilities
- // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
- private static final class BlockingSourceContext<T> implements SourceContext<T> {
+ private static final class BlockingSourceContext<T> implements SourceContext<T> {
- private final ReentrantLock lock = new ReentrantLock();
- private final OneShotLatch inBlocking = new OneShotLatch();
+ private final ReentrantLock lock = new ReentrantLock();
+ private final OneShotLatch inBlocking = new OneShotLatch();
- @Override
- public void collect(T element) {
- block();
- }
+ @Override
+ public void collect(T element) {
+ block();
+ }
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- block();
- }
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ block();
+ }
- @Override
- public void emitWatermark(Watermark mark) {
- block();
- }
+ @Override
+ public void emitWatermark(Watermark mark) {
+ block();
+ }
@Override
public void markAsTemporarilyIdle() {
@@ -453,42 +451,42 @@ public class Kafka010FetcherTest {
}
@Override
- public Object getCheckpointLock() {
- return new Object();
- }
-
- @Override
- public void close() {}
-
- public void waitTillHasBlocker() throws InterruptedException {
- inBlocking.await();
- }
-
- public boolean isStillBlocking() {
- return lock.isLocked();
- }
-
- @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
- private void block() {
- lock.lock();
- try {
- inBlocking.trigger();
-
- // put this thread to sleep indefinitely
- final Object o = new Object();
- while (true) {
- synchronized (o) {
- o.wait();
- }
- }
- }
- catch (InterruptedException e) {
- // exit cleanly, simply reset the interruption flag
- Thread.currentThread().interrupt();
- }
- finally {
- lock.unlock();
- }
- }
- }
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+
+ @Override
+ public void close() {}
+
+ public void waitTillHasBlocker() throws InterruptedException {
+ inBlocking.await();
+ }
+
+ public boolean isStillBlocking() {
+ return lock.isLocked();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+ private void block() {
+ lock.lock();
+ try {
+ inBlocking.trigger();
+
+ // put this thread to sleep indefinitely
+ final Object o = new Object();
+ while (true) {
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit cleanly, simply reset the interruption flag
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
}