You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:05 UTC

[flink] 04/04: [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager).

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5abef56b2bf85bcac786f6b16b6899b6cced7176
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 17:43:16 2020 +0200

    [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager).
    
    This makes sure that the reader is notified / woken up when the fetcher encounters an error.
---
 .../source/reader/fetcher/SplitFetcherManager.java |   4 +-
 .../reader/fetcher/SplitFetcherManagerTest.java    | 159 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index ffac523..7a20a59 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -92,9 +92,9 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 				if (!uncaughtFetcherException.compareAndSet(null, t)) {
 					// Add the exception to the exception list.
 					uncaughtFetcherException.get().addSuppressed(t);
-					// Wake up the main thread to let it know the exception.
-					elementsQueue.notifyAvailable();
 				}
+				// Wake up the main thread to let it know the exception.
+				elementsQueue.notifyAvailable();
 			}
 		};
 		this.splitReaderFactory = splitReaderFactory;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
new file mode 100644
index 0000000..3ff25e0
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Queue;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for the {@link SplitFetcherManager}.
+ */
+public class SplitFetcherManagerTest {
+
+	@Test
+	public void testExceptionPropagationFirstFetch() throws Exception {
+		testExceptionPropagation();
+	}
+
+	@Test
+	public void testExceptionPropagationSuccessiveFetch() throws Exception {
+		testExceptionPropagation(
+				new TestingRecordsWithSplitIds<>("testSplit", 1, 2, 3, 4),
+				new TestingRecordsWithSplitIds<>("testSplit", 5, 6, 7, 8)
+		);
+	}
+
+	// the final modifier is important so that '@SafeVarargs' is accepted on Java 8
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final void testExceptionPropagation(final RecordsWithSplitIds<Integer>... fetchesBeforeError) throws Exception {
+		final IOException testingException = new IOException("test");
+
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue = new FutureCompletingBlockingQueue<>(10);
+		final AwaitingReader<Integer, TestingSourceSplit> reader = new AwaitingReader<>(testingException, fetchesBeforeError);
+		final SplitFetcherManager<Integer, TestingSourceSplit> fetcher = createFetcher("testSplit", queue, reader);
+
+		reader.awaitAllRecordsReturned();
+		drainQueue(queue);
+
+		assertFalse(queue.getAvailabilityFuture().isDone());
+		reader.triggerThrowException();
+
+		// await the error propagation
+		queue.getAvailabilityFuture().get();
+
+		try {
+			fetcher.checkErrors();
+			fail("expected exception");
+		} catch (Exception e) {
+			assertSame(testingException, e.getCause().getCause());
+		} finally {
+			fetcher.close(20_000L);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test helpers
+	// ------------------------------------------------------------------------
+
+	private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(
+			final String splitId,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+			final SplitReader<E, TestingSourceSplit> reader) {
+
+		final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher =
+				new SingleThreadFetcherManager<>(queue, () -> reader);
+		fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
+		return fetcher;
+	}
+
+	private static void drainQueue(FutureCompletingBlockingQueue<?> queue) {
+		//noinspection StatementWithEmptyBody
+		while (queue.poll() != null) {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class AwaitingReader<E, SplitT extends SourceSplit> implements SplitReader <E, SplitT> {
+
+		private final Queue<RecordsWithSplitIds<E>> fetches;
+		private final IOException testError;
+
+		private final OneShotLatch inBlocking = new OneShotLatch();
+		private final OneShotLatch throwError = new OneShotLatch();
+
+		@SafeVarargs
+		AwaitingReader(IOException testError, RecordsWithSplitIds<E>... fetches) {
+			this.testError = testError;
+			this.fetches = new ArrayDeque<>(Arrays.asList(fetches));
+		}
+
+		@Override
+		public RecordsWithSplitIds<E> fetch() throws IOException {
+			if (!fetches.isEmpty()) {
+				return fetches.poll();
+			} else {
+				inBlocking.trigger();
+				try {
+					throwError.await();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+					throw new IOException("interrupted");
+				}
+				throw testError;
+			}
+		}
+
+		@Override
+		public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
+			splitsChanges.clear();
+		}
+
+		@Override
+		public void wakeUp() {}
+
+		public void awaitAllRecordsReturned() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public void triggerThrowException() {
+			throwError.trigger();
+		}
+	}
+}