You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/23 15:28:51 UTC

flink git commit: [hotfix] [kafka] Indent Kafka010FetcherTest with tabs instead of spaces

Repository: flink
Updated Branches:
  refs/heads/master 8dac43613 -> de2605ea7


[hotfix] [kafka] Indent Kafka010FetcherTest with tabs instead of spaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2605ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2605ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2605ea

Branch: refs/heads/master
Commit: de2605ea7b17fc569890a53743783b7d26c8e56b
Parents: 8dac436
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Feb 23 23:13:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Feb 23 23:13:03 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   | 688 +++++++++----------
 1 file changed, 343 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de2605ea/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 5718986..98aa28a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -78,51 +78,51 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 @PrepareForTest(KafkaConsumerThread.class)
 public class Kafka010FetcherTest {
 
-    @Test
-    public void testCommitDoesNotBlock() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
-        final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
-        testCommitData.put(testPartition, 11L);
-
-        // to synchronize when the consumer is in its blocking method
-        final OneShotLatch sync = new OneShotLatch();
-
-        // ----- the mock consumer with blocking poll calls ----
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-                sync.trigger();
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        // make sure the fetcher creates the mock consumer
-        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-        		sourceContext,
+	@Test
+	public void testCommitDoesNotBlock() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+		testCommitData.put(testPartition, 11L);
+
+		// to synchronize when the consumer is in its blocking method
+		final OneShotLatch sync = new OneShotLatch();
+
+		// ----- the mock consumer with blocking poll calls ----
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				sync.trigger();
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+				sourceContext,
 				topics,
 				null, /* no restored state */
 				null, /* periodic assigner */
@@ -139,128 +139,128 @@ public class Kafka010FetcherTest {
 				StartupMode.GROUP_OFFSETS,
 				false);
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // wait until the fetcher has reached the method of interest
-        sync.await();
-
-        // ----- trigger the offset commit -----
-
-        final AtomicReference<Throwable> commitError = new AtomicReference<>();
-        final Thread committer = new Thread("committer runner") {
-            @Override
-            public void run() {
-                try {
-                    fetcher.commitInternalOffsetsToKafka(testCommitData);
-                } catch (Throwable t) {
-                    commitError.set(t);
-                }
-            }
-        };
-        committer.start();
-
-        // ----- ensure that the committer finishes in time  -----
-        committer.join(30000);
-        assertFalse("The committer did not finish in time", committer.isAlive());
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown -----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable fetcherError = error.get();
-        if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", fetcherError);
-        }
-        final Throwable committerError = commitError.get();
-        if (committerError != null) {
-            throw new Exception("Exception in the committer", committerError);
-        }
-    }
-
-    @Test
-    public void ensureOffsetsGetCommitted() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
-        final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
-        final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
-        testCommitData1.put(testPartition1, 11L);
-        testCommitData1.put(testPartition2, 18L);
-
-        final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
-        testCommitData2.put(testPartition1, 19L);
-        testCommitData2.put(testPartition2, 28L);
-
-        final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-
-        // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                @SuppressWarnings("unchecked")
-                Map<TopicPartition, OffsetAndMetadata> offsets =
-                        (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
-                OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
-                commitStore.add(offsets);
-                callback.onComplete(offsets, null);
-
-                return null;
-            }
-        }).when(mockConsumer).commitAsync(
-                Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
-        // make sure the fetcher creates the mock consumer
-        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-        		sourceContext,
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the fetcher has reached the method of interest
+		sync.await();
+
+		// ----- trigger the offset commit -----
+
+		final AtomicReference<Throwable> commitError = new AtomicReference<>();
+		final Thread committer = new Thread("committer runner") {
+			@Override
+			public void run() {
+				try {
+					fetcher.commitInternalOffsetsToKafka(testCommitData);
+				} catch (Throwable t) {
+					commitError.set(t);
+				}
+			}
+		};
+		committer.start();
+
+		// ----- ensure that the committer finishes in time  -----
+		committer.join(30000);
+		assertFalse("The committer did not finish in time", committer.isAlive());
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable fetcherError = error.get();
+		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", fetcherError);
+		}
+
+		final Throwable committerError = commitError.get();
+		if (committerError != null) {
+			throw new Exception("Exception in the committer", committerError);
+		}
+	}
+
+	@Test
+	public void ensureOffsetsGetCommitted() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+		testCommitData1.put(testPartition1, 11L);
+		testCommitData1.put(testPartition2, 18L);
+
+		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+		testCommitData2.put(testPartition1, 19L);
+		testCommitData2.put(testPartition2, 28L);
+
+		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				@SuppressWarnings("unchecked")
+				Map<TopicPartition, OffsetAndMetadata> offsets =
+						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+				commitStore.add(offsets);
+				callback.onComplete(offsets, null);
+
+				return null;
+			}
+		}).when(mockConsumer).commitAsync(
+				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+				sourceContext,
 				topics,
 				null, /* no restored state */
 				null, /* periodic assigner */
@@ -277,106 +277,105 @@ public class Kafka010FetcherTest {
 				StartupMode.GROUP_OFFSETS,
 				false);
 
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// ----- trigger the first offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(12L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(18L, entry.getValue().offset());
+			}
+		}
+
+		// ----- trigger the second offset commit -----
+
+		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(20L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(28L, entry.getValue().offset());
+			}
+		}
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable caughtError = error.get();
+		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+			throw new Exception("Exception in the fetcher", caughtError);
+		}
+	}
+
+	@Test
+	public void testCancellationWhenEmitBlocks() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // ----- trigger the first offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData1);
-        Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(12L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(18L, entry.getValue().offset());
-            }
-        }
-
-        // ----- trigger the second offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData2);
-        Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(20L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(28L, entry.getValue().offset());
-            }
-        }
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown -----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", caughtError);
-        }
-    }
-
-    @Test
-    public void testCancellationWhenEmitBlocks() throws Exception {
-
-        // ----- some test data -----
-
-        final String topic = "test-topic";
-        final int partition = 3;
-        final byte[] payload = new byte[] {1, 2, 3, 4};
-
-        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
-        data.put(new TopicPartition(topic, partition), records);
-
-        final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
-        // ----- the test consumer -----
-
-        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-                return consumerRecords;
-            }
-        });
-
-        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- build a fetcher -----
-
-        BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
-        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
-        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
 				sourceContext,
 				topics,
 				null, /* no restored state */
@@ -394,58 +393,57 @@ public class Kafka010FetcherTest {
 				StartupMode.GROUP_OFFSETS,
 				false);
 
+		// ----- run the fetcher -----
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
 
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
 
-        // wait until the thread started to emit records to the source context
-        sourceContext.waitTillHasBlocker();
+		// wait until the thread started to emit records to the source context
+		sourceContext.waitTillHasBlocker();
 
-        // now we try to cancel the fetcher, including the interruption usually done on the task thread
-        // once it has finished, there must be no more thread blocked on the source context
-        fetcher.cancel();
-        fetcherRunner.interrupt();
-        fetcherRunner.join();
+		// now we try to cancel the fetcher, including the interruption usually done on the task thread
+		// once it has finished, there must be no more thread blocked on the source context
+		fetcher.cancel();
+		fetcherRunner.interrupt();
+		fetcherRunner.join();
 
-        assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
-    }
+		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+	}
 
-    // ------------------------------------------------------------------------
-    //  test utilities
-    // ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  test utilities
+	// ------------------------------------------------------------------------
 
-    private static final class BlockingSourceContext<T> implements SourceContext<T> {
+	private static final class BlockingSourceContext<T> implements SourceContext<T> {
 
-        private final ReentrantLock lock = new ReentrantLock();
-        private final OneShotLatch inBlocking = new OneShotLatch();
+		private final ReentrantLock lock = new ReentrantLock();
+		private final OneShotLatch inBlocking = new OneShotLatch();
 
-        @Override
-        public void collect(T element) {
-            block();
-        }
+		@Override
+		public void collect(T element) {
+			block();
+		}
 
-        @Override
-        public void collectWithTimestamp(T element, long timestamp) {
-            block();
-        }
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			block();
+		}
 
-        @Override
-        public void emitWatermark(Watermark mark) {
-            block();
-        }
+		@Override
+		public void emitWatermark(Watermark mark) {
+			block();
+		}
 
 		@Override
 		public void markAsTemporarilyIdle() {
@@ -453,42 +451,42 @@ public class Kafka010FetcherTest {
 		}
 
 		@Override
-        public Object getCheckpointLock() {
-            return new Object();
-        }
-
-        @Override
-        public void close() {}
-
-        public void waitTillHasBlocker() throws InterruptedException {
-            inBlocking.await();
-        }
-
-        public boolean isStillBlocking() {
-            return lock.isLocked();
-        }
-
-        @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
-        private void block() {
-            lock.lock();
-            try {
-                inBlocking.trigger();
-
-                // put this thread to sleep indefinitely
-                final Object o = new Object();
-                while (true) {
-                    synchronized (o) {
-                        o.wait();
-                    }
-                }
-            }
-            catch (InterruptedException e) {
-                // exit cleanly, simply reset the interruption flag
-                Thread.currentThread().interrupt();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
+		public Object getCheckpointLock() {
+			return new Object();
+		}
+
+		@Override
+		public void close() {}
+
+		public void waitTillHasBlocker() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public boolean isStillBlocking() {
+			return lock.isLocked();
+		}
+
+		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+		private void block() {
+			lock.lock();
+			try {
+				inBlocking.trigger();
+
+				// put this thread to sleep indefinitely
+				final Object o = new Object();
+				while (true) {
+					synchronized (o) {
+						o.wait();
+					}
+				}
+			}
+			catch (InterruptedException e) {
+				// exit cleanly, simply reset the interruption flag
+				Thread.currentThread().interrupt();
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+	}
 }