You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:01 UTC

[flink] branch master updated (b43075b -> 5abef56)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b43075b  [hotfix] Add missing log4j2-test.properties files
     new 12261c6  [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.
     new 4700bb5  [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)
     new 8fcca83  [hotfix][connectors] Improve JavaDocs for SingleThreadFetcherManager
     new 5abef56  [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager).

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/reader/SourceReaderBase.java       |  10 +-
 .../base/source/reader/fetcher/AddSplitsTask.java  |   2 +-
 .../base/source/reader/fetcher/FetchTask.java      |   5 +-
 .../reader/fetcher/SingleThreadFetcherManager.java |  16 ++-
 .../base/source/reader/fetcher/SplitFetcher.java   |  45 ++----
 .../source/reader/fetcher/SplitFetcherManager.java |   4 +-
 .../source/reader/fetcher/SplitFetcherTask.java    |   3 +-
 .../source/reader/splitreader/SplitReader.java     |   3 +-
 .../reader/fetcher/SplitFetcherManagerTest.java    | 159 +++++++++++++++++++++
 .../source/reader/fetcher/SplitFetcherTest.java    |  14 +-
 .../base/source/reader/mocks/MockSplitReader.java  |  36 +++--
 .../source/reader/mocks/TestingSplitReader.java    |   8 +-
 .../connector/source/mocks/MockSourceSplit.java    |   3 +
 13 files changed, 241 insertions(+), 67 deletions(-)
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java


[flink] 01/04: [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12261c6b7ed6478a9b9f6a69cb58246b83cab9b7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 17:30:28 2020 +0200

    [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.
    
      - Remove the InterruptedException from methods where we now do not expect an InterruptedException
        to be thrown any more.
      - Removes the code that was previously clearing the interruption flag for "benign interruptions".
      - Adjust comments to reflect actual code model
      - Remove unused method
      - Adjust tests to not use arbitrary interruptions but local (in the wakeup scope only)
---
 .../base/source/reader/fetcher/AddSplitsTask.java  |  2 +-
 .../base/source/reader/fetcher/FetchTask.java      |  5 ++-
 .../base/source/reader/fetcher/SplitFetcher.java   | 45 ++++------------------
 .../source/reader/fetcher/SplitFetcherTask.java    |  3 +-
 .../source/reader/splitreader/SplitReader.java     |  3 +-
 .../source/reader/fetcher/SplitFetcherTest.java    | 14 +++----
 .../base/source/reader/mocks/MockSplitReader.java  | 36 +++++++++++++----
 .../source/reader/mocks/TestingSplitReader.java    |  8 +++-
 .../connector/source/mocks/MockSourceSplit.java    |  3 ++
 9 files changed, 59 insertions(+), 60 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 19b15b5..82e529a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -50,7 +50,7 @@ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
 	}
 
 	@Override
-	public boolean run() throws InterruptedException {
+	public boolean run() {
 		if (!splitsChangesAdded) {
 			splitsChanges.add(new SplitsAddition<>(splitsToAdd));
 			splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s));
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 530add1..743e763 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -52,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	}
 
 	@Override
-	public boolean run() throws InterruptedException, IOException {
+	public boolean run() throws IOException {
 		try {
 			if (!isWakenUp() && lastRecords == null) {
 				lastRecords = splitReader.fetch();
@@ -67,6 +67,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 					lastRecords = null;
 				}
 			}
+		} catch (InterruptedException e) {
+			// this should only happen on shutdown
+			throw new IOException("Source fetch execution was interrupted", e);
 		} finally {
 			// clean up the potential wakeup effect. It is possible that the fetcher is waken up
 			// after the clean up. In that case, either the wakeup flag will be set or the
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 3beb0da..db5a203 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -98,8 +97,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 				runOnce();
 			}
 		} finally {
-			// Reset the interrupted flag so the shutdown hook do not got interrupted.
-			Thread.interrupted();
 			shutdownHook.run();
 			LOG.info("Split fetcher {} exited.", id);
 		}
@@ -116,38 +113,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			} else {
 				runningTask = taskQueue.take();
 			}
-			// Now the running task is not null. If wakeUp() is called after this point, the fetcher
-			// thread will not be interrupted. Instead task.wakeUp() will be called. On the other hand,
-			// If the wakeUp() call was make before this point, the wakeUp flag must have already been
-			// have been set, and the fetcher thread may or may not be interrupted, depending on
-			// whether the wakeUp() call was before or after the runningTask assignment. So the
-			// code does the following:
-			// 1. check and clear the interrupt flag on the fetcher thread to avoid interruption in
-			//    later code.
-			// 2. check the wakeUp flag to avoid unnecessary task run.
+			// Now the running task is not null. If wakeUp() is called after this point,
+			// task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before
+			// this point, the wakeUp flag must have already been set. The code hence checks the wakeUp
+			// flag first to avoid an unnecessary task run.
 			// Note that the runningTask may still encounter the case that the task is waken up before
 			// the it starts running.
 			LOG.debug("Prepare to run {}", runningTask);
-			if (!Thread.interrupted() && !wakeUp.get() && runningTask.run()) {
+			if (!wakeUp.get() && runningTask.run()) {
 				LOG.debug("Finished running task {}", runningTask);
 				// the task has finished running. Set it to null so it won't be enqueued.
 				runningTask = null;
 			}
-		} catch (InterruptedException ie) {
-			if (closed.get()) {
-				// The fetcher is closed, just return;
-				return;
-			} else if (wakeUp.get()) {
-				// The fetcher thread has just been waken up. So ignore the interrupted exception
-				// and continue;
-				LOG.debug("Split fetcher has been waken up.");
-			} else {
-				throw new RuntimeException(String.format(
-					"SplitFetcher thread %d interrupted while polling the records", id), ie);
-			}
-		} catch (IOException ioe) {
+		} catch (Exception e) {
 			throw new RuntimeException(String.format(
-				"SplitFetcher thread %d received unexpected exception while polling the records", id), ioe);
+				"SplitFetcher thread %d received unexpected exception while polling the records", id), e);
 		}
 		// If the task is not null that means this task needs to be re-executed. This only
 		// happens when the task is the fetching task or the task was interrupted.
@@ -156,9 +136,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			// Set the running task to null. It is necessary for the shutdown method to avoid
 			// unnecessarily interrupt the running task.
 			runningTask = null;
-			// Clean the interrupt flag in case the running task was interrupted after it finishes
-			// running but before it was set to null.
-			Thread.interrupted();
 			// Set the wakeUp flag to false.
 			wakeUp.set(false);
 			LOG.debug("Cleaned wakeup flag.");
@@ -290,12 +267,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		return task != null && task != WAKEUP_TASK;
 	}
 
-	private void removeAssignedSplit(String splitId) {
-		assignedSplits.remove(splitId);
-		LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits);
-
-	}
-
 	private void checkAndSetIdle() {
 		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
 		if (nowIdle) {
@@ -317,7 +288,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		}
 
 		@Override
-		public boolean run() throws InterruptedException {
+		public boolean run() {
 			return false;
 		}
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 999601a..ebbc0a6 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -32,10 +32,9 @@ public interface SplitFetcherTask {
 	 * invocation is needed.
 	 *
 	 * @return whether the runnable has successfully finished running.
-	 * @throws InterruptedException when interrupted.
 	 * @throws IOException when the performed I/O operation fails.
 	 */
-	boolean run() throws InterruptedException, IOException;
+	boolean run() throws IOException;
 
 	/**
 	 * Wake up the running thread.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index b980f7b..9114614 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -42,10 +42,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
 	 *
 	 * @return the Ids of the finished splits.
 	 *
-	 * @throws InterruptedException when interrupted
 	 * @throws IOException when encountered IO errors, such as deserialization failures.
 	 */
-	RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
+	RecordsWithSplitIds<E> fetch() throws IOException;
 
 	/**
 	 * Handle the split changes. This call should be non-blocking.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index c25490b..e5f5faf 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -157,7 +157,7 @@ public class SplitFetcherTest {
 	public void testWakeup() throws InterruptedException {
 		final int numSplits = 3;
 		final int numRecordsPerSplit = 10_000;
-		final int interruptRecordsInterval = 10;
+		final int wakeupRecordsInterval = 10;
 		final int numTotalRecords = numRecordsPerSplit * numSplits;
 
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
@@ -189,16 +189,16 @@ public class SplitFetcherTest {
 		// A thread waking up the split fetcher frequently.
 		AtomicInteger wakeupTimes = new AtomicInteger(0);
 		AtomicBoolean stop = new AtomicBoolean(false);
-		Thread interrupter = new Thread("Interrupter") {
+		Thread wakeUpCaller = new Thread("Wakeup Caller") {
 			@Override
 			public void run() {
-				int lastInterrupt = 0;
+				int lastWakeup = 0;
 				while (recordsRead.size() < numTotalRecords && !stop.get()) {
 					int numRecordsRead = recordsRead.size();
-					if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
+					if (numRecordsRead >= lastWakeup + wakeupRecordsInterval) {
 						fetcher.wakeUp(false);
 						wakeupTimes.incrementAndGet();
-						lastInterrupt = numRecordsRead;
+						lastWakeup = numRecordsRead;
 					}
 				}
 			}
@@ -206,7 +206,7 @@ public class SplitFetcherTest {
 
 		try {
 			fetcherThread.start();
-			interrupter.start();
+			wakeUpCaller.start();
 
 			while (recordsRead.size() < numSplits * numRecordsPerSplit) {
 				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
@@ -226,7 +226,7 @@ public class SplitFetcherTest {
 			stop.set(true);
 			fetcher.shutdown();
 			fetcherThread.join();
-			interrupter.join();
+			wakeUpCaller.join();
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 00d4d71..4cb738a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -43,7 +43,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	private final int numRecordsPerSplitPerFetch;
 	private final boolean blockingFetch;
 	private final boolean handleSplitsInOneShot;
-	private volatile Thread runningThread;
+
+	private final Object wakeupLock = new Object();
+	private volatile Thread threadInBlocking;
+	private boolean wokenUp;
 
 	public MockSplitReader(
 			int numRecordsPerSplitPerFetch,
@@ -52,14 +55,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 		this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
 		this.blockingFetch = blockingFetch;
 		this.handleSplitsInOneShot = handleSplitsInOneShot;
-		this.runningThread = null;
 	}
 
 	@Override
-	public RecordsWithSplitIds<int[]> fetch() throws InterruptedException {
-		if (runningThread == null) {
-			runningThread = Thread.currentThread();
-		}
+	public RecordsWithSplitIds<int[]> fetch() {
 		return getRecords();
 	}
 
@@ -75,14 +74,26 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 
 	@Override
 	public void wakeUp() {
-		if (blockingFetch && runningThread != null) {
-			runningThread.interrupt();
+		synchronized (wakeupLock) {
+			wokenUp = true;
+			if (threadInBlocking != null) {
+				threadInBlocking.interrupt();
+			}
 		}
 	}
 
 	private RecordsBySplits<int[]> getRecords() {
 		final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
 
+		// after this locked section, the thread might be interrupted
+		synchronized (wakeupLock) {
+			if (wokenUp) {
+				wokenUp = false;
+				return records.build();
+			}
+			threadInBlocking = Thread.currentThread();
+		}
+
 		try {
 			for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
 				MockSourceSplit split = entry.getValue();
@@ -102,7 +113,16 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 			if (!blockingFetch) {
 				throw new RuntimeException("Caught unexpected interrupted exception.");
 			}
+		} finally {
+			// after this locked section, the thread may not be interrupted any more
+			synchronized (wakeupLock) {
+				wokenUp = false;
+				//noinspection ResultOfMethodCallIgnored
+				Thread.interrupted();
+				threadInBlocking = null;
+			}
 		}
+
 		return records.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 0d202f7..5643088 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -42,13 +42,17 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 	}
 
 	@Override
-	public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException {
+	public RecordsWithSplitIds<E> fetch() throws IOException {
 		if (!fetches.isEmpty()) {
 			return fetches.removeFirst();
 		} else {
 			// block until woken up
 			synchronized (fetches) {
-				fetches.wait();
+				try {
+					fetches.wait();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 				return null;
 			}
 		}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index dc8ce80..878d674 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -34,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * polled out of the queue since the creation of this split.
  */
 public class MockSourceSplit implements SourceSplit, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
 	private final int id;
 	private final BlockingQueue<Integer> records;
 	private final int endIndex;


[flink] 04/04: [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager).

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5abef56b2bf85bcac786f6b16b6899b6cced7176
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 17:43:16 2020 +0200

    [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager).
    
    This makes sure that the reader is notified / woken up when the fetcher encounters an error.
---
 .../source/reader/fetcher/SplitFetcherManager.java |   4 +-
 .../reader/fetcher/SplitFetcherManagerTest.java    | 159 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index ffac523..7a20a59 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -92,9 +92,9 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 				if (!uncaughtFetcherException.compareAndSet(null, t)) {
 					// Add the exception to the exception list.
 					uncaughtFetcherException.get().addSuppressed(t);
-					// Wake up the main thread to let it know the exception.
-					elementsQueue.notifyAvailable();
 				}
+				// Wake up the main thread to let it know the exception.
+				elementsQueue.notifyAvailable();
 			}
 		};
 		this.splitReaderFactory = splitReaderFactory;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
new file mode 100644
index 0000000..3ff25e0
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Queue;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for the {@link SplitFetcherManager}.
+ */
+public class SplitFetcherManagerTest {
+
+	@Test
+	public void testExceptionPropagationFirstFetch() throws Exception {
+		testExceptionPropagation();
+	}
+
+	@Test
+	public void testExceptionPropagationSuccessiveFetch() throws Exception {
+		testExceptionPropagation(
+				new TestingRecordsWithSplitIds<>("testSplit", 1, 2, 3, 4),
+				new TestingRecordsWithSplitIds<>("testSplit", 5, 6, 7, 8)
+		);
+	}
+
+	// the final modifier is important so that '@SafeVarargs' is accepted on Java 8
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final void testExceptionPropagation(final RecordsWithSplitIds<Integer>... fetchesBeforeError) throws Exception {
+		final IOException testingException = new IOException("test");
+
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue = new FutureCompletingBlockingQueue<>(10);
+		final AwaitingReader<Integer, TestingSourceSplit> reader = new AwaitingReader<>(testingException, fetchesBeforeError);
+		final SplitFetcherManager<Integer, TestingSourceSplit> fetcher = createFetcher("testSplit", queue, reader);
+
+		reader.awaitAllRecordsReturned();
+		drainQueue(queue);
+
+		assertFalse(queue.getAvailabilityFuture().isDone());
+		reader.triggerThrowException();
+
+		// await the error propagation
+		queue.getAvailabilityFuture().get();
+
+		try {
+			fetcher.checkErrors();
+			fail("expected exception");
+		} catch (Exception e) {
+			assertSame(testingException, e.getCause().getCause());
+		} finally {
+			fetcher.close(20_000L);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test helpers
+	// ------------------------------------------------------------------------
+
+	private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(
+			final String splitId,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+			final SplitReader<E, TestingSourceSplit> reader) {
+
+		final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher =
+				new SingleThreadFetcherManager<>(queue, () -> reader);
+		fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
+		return fetcher;
+	}
+
+	private static void drainQueue(FutureCompletingBlockingQueue<?> queue) {
+		//noinspection StatementWithEmptyBody
+		while (queue.poll() != null) {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class AwaitingReader<E, SplitT extends SourceSplit> implements SplitReader <E, SplitT> {
+
+		private final Queue<RecordsWithSplitIds<E>> fetches;
+		private final IOException testError;
+
+		private final OneShotLatch inBlocking = new OneShotLatch();
+		private final OneShotLatch throwError = new OneShotLatch();
+
+		@SafeVarargs
+		AwaitingReader(IOException testError, RecordsWithSplitIds<E>... fetches) {
+			this.testError = testError;
+			this.fetches = new ArrayDeque<>(Arrays.asList(fetches));
+		}
+
+		@Override
+		public RecordsWithSplitIds<E> fetch() throws IOException {
+			if (!fetches.isEmpty()) {
+				return fetches.poll();
+			} else {
+				inBlocking.trigger();
+				try {
+					throwError.await();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+					throw new IOException("interrupted");
+				}
+				throw testError;
+			}
+		}
+
+		@Override
+		public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
+			splitsChanges.clear();
+		}
+
+		@Override
+		public void wakeUp() {}
+
+		public void awaitAllRecordsReturned() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public void triggerThrowException() {
+			throwError.trigger();
+		}
+	}
+}


[flink] 03/04: [hotfix][connectors] Improve JavaDocs for SingleThreadFetcherManager

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8fcca837c55a9216595ee4c03038b52747098dbb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 21:29:41 2020 +0200

    [hotfix][connectors] Improve JavaDocs for SingleThreadFetcherManager
---
 .../reader/fetcher/SingleThreadFetcherManager.java       | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index 339c533..de50e6a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
@@ -27,11 +28,24 @@ import java.util.List;
 import java.util.function.Supplier;
 
 /**
- * A Fetcher manager with a single fetcher and assign all the splits to it.
+ * A Fetcher Manager with a single fetching thread (I/O thread) that handles all splits concurrently.
+ *
+ * <p>This pattern is, for example, useful for connectors like File Readers, Apache Kafka Readers, etc.
+ * In the example of Kafka, there is a single thread that reads all splits (topic partitions) via the
+ * same client. In the example of the file source, there is a single thread that reads the files
+ * after another.
  */
 public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
 		extends SplitFetcherManager<E, SplitT> {
 
+	/**
+	 * Creates a new SplitFetcherManager with a single I/O threads.
+	 *
+	 * @param elementsQueue The queue that is used to hand over data from the I/O thread (the fetchers)
+	 *                      to the reader (which emits the records and book-keeps the state.
+	 *                      This must be the same queue instance that is also passed to the {@link SourceReaderBase}.
+	 * @param splitReaderSupplier The factory for the split reader that connects to the source system.
+	 */
 	public SingleThreadFetcherManager(
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {


[flink] 02/04: [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4700bb5dde3303cbe98882f6beb7379425717b01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 21:27:00 2020 +0200

    [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)
    
      - SourceReaderBase avoids not emitting an element (and exiting to caller / mailbox) when
        transitioning between fetches
    
      - Avoid eager checking of queue empty condition (requires lock acquisition) when determining
        whether end of input is reached. Check that expensive condition last instead.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fb4e6df9..97a6a95 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -136,7 +136,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				return trace(InputStatus.MORE_AVAILABLE);
 			}
 			else if (!moveToNextSplit(recordsWithSplitId, output)) {
-				return trace(finishedOrAvailableLater());
+				// The fetch is done and we just discovered that and have not emitted anything, yet.
+				// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
+				// rather than emitting nothing and waiting for the caller to call us again.
+				return pollNext(output);
 			}
 			// else fall through the loop
 		}
@@ -258,9 +261,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	// ------------------ private helper methods ---------------------
 
 	private InputStatus finishedOrAvailableLater() {
-		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
-		boolean allElementsEmitted = elementsQueue.isEmpty();
-		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
+		final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
+		if (noMoreSplitsAssignment && allFetchersHaveShutdown && elementsQueue.isEmpty()) {
 			return InputStatus.END_OF_INPUT;
 		} else {
 			return InputStatus.NOTHING_AVAILABLE;