You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:02 UTC

[flink] 01/04: [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12261c6b7ed6478a9b9f6a69cb58246b83cab9b7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 17:30:28 2020 +0200

    [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.
    
      - Remove the InterruptedException from methods where we now do not expect an InterruptedException
        to be thrown any more.
      - Removes the code that was previously clearing the interruption flag for "benign interruptions".
      - Adjust comments to reflect actual code model
      - Remove unused method
      - Adjust tests to not use arbitrary interruptions but local (in the wakeup scope only)
---
 .../base/source/reader/fetcher/AddSplitsTask.java  |  2 +-
 .../base/source/reader/fetcher/FetchTask.java      |  5 ++-
 .../base/source/reader/fetcher/SplitFetcher.java   | 45 ++++------------------
 .../source/reader/fetcher/SplitFetcherTask.java    |  3 +-
 .../source/reader/splitreader/SplitReader.java     |  3 +-
 .../source/reader/fetcher/SplitFetcherTest.java    | 14 +++----
 .../base/source/reader/mocks/MockSplitReader.java  | 36 +++++++++++++----
 .../source/reader/mocks/TestingSplitReader.java    |  8 +++-
 .../connector/source/mocks/MockSourceSplit.java    |  3 ++
 9 files changed, 59 insertions(+), 60 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 19b15b5..82e529a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -50,7 +50,7 @@ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
 	}
 
 	@Override
-	public boolean run() throws InterruptedException {
+	public boolean run() {
 		if (!splitsChangesAdded) {
 			splitsChanges.add(new SplitsAddition<>(splitsToAdd));
 			splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s));
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 530add1..743e763 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -52,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	}
 
 	@Override
-	public boolean run() throws InterruptedException, IOException {
+	public boolean run() throws IOException {
 		try {
 			if (!isWakenUp() && lastRecords == null) {
 				lastRecords = splitReader.fetch();
@@ -67,6 +67,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 					lastRecords = null;
 				}
 			}
+		} catch (InterruptedException e) {
+			// this should only happen on shutdown
+			throw new IOException("Source fetch execution was interrupted", e);
 		} finally {
 			// clean up the potential wakeup effect. It is possible that the fetcher is waken up
 			// after the clean up. In that case, either the wakeup flag will be set or the
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 3beb0da..db5a203 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -98,8 +97,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 				runOnce();
 			}
 		} finally {
-			// Reset the interrupted flag so the shutdown hook do not got interrupted.
-			Thread.interrupted();
 			shutdownHook.run();
 			LOG.info("Split fetcher {} exited.", id);
 		}
@@ -116,38 +113,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			} else {
 				runningTask = taskQueue.take();
 			}
-			// Now the running task is not null. If wakeUp() is called after this point, the fetcher
-			// thread will not be interrupted. Instead task.wakeUp() will be called. On the other hand,
-			// If the wakeUp() call was make before this point, the wakeUp flag must have already been
-			// have been set, and the fetcher thread may or may not be interrupted, depending on
-			// whether the wakeUp() call was before or after the runningTask assignment. So the
-			// code does the following:
-			// 1. check and clear the interrupt flag on the fetcher thread to avoid interruption in
-			//    later code.
-			// 2. check the wakeUp flag to avoid unnecessary task run.
+			// Now the running task is not null. If wakeUp() is called after this point,
+			// task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before
+			// this point, the wakeUp flag must have already been set. The code hence checks the wakeUp
+			// flag first to avoid an unnecessary task run.
 			// Note that the runningTask may still encounter the case that the task is waken up before
 			// the it starts running.
 			LOG.debug("Prepare to run {}", runningTask);
-			if (!Thread.interrupted() && !wakeUp.get() && runningTask.run()) {
+			if (!wakeUp.get() && runningTask.run()) {
 				LOG.debug("Finished running task {}", runningTask);
 				// the task has finished running. Set it to null so it won't be enqueued.
 				runningTask = null;
 			}
-		} catch (InterruptedException ie) {
-			if (closed.get()) {
-				// The fetcher is closed, just return;
-				return;
-			} else if (wakeUp.get()) {
-				// The fetcher thread has just been waken up. So ignore the interrupted exception
-				// and continue;
-				LOG.debug("Split fetcher has been waken up.");
-			} else {
-				throw new RuntimeException(String.format(
-					"SplitFetcher thread %d interrupted while polling the records", id), ie);
-			}
-		} catch (IOException ioe) {
+		} catch (Exception e) {
 			throw new RuntimeException(String.format(
-				"SplitFetcher thread %d received unexpected exception while polling the records", id), ioe);
+				"SplitFetcher thread %d received unexpected exception while polling the records", id), e);
 		}
 		// If the task is not null that means this task needs to be re-executed. This only
 		// happens when the task is the fetching task or the task was interrupted.
@@ -156,9 +136,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			// Set the running task to null. It is necessary for the shutdown method to avoid
 			// unnecessarily interrupt the running task.
 			runningTask = null;
-			// Clean the interrupt flag in case the running task was interrupted after it finishes
-			// running but before it was set to null.
-			Thread.interrupted();
 			// Set the wakeUp flag to false.
 			wakeUp.set(false);
 			LOG.debug("Cleaned wakeup flag.");
@@ -290,12 +267,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		return task != null && task != WAKEUP_TASK;
 	}
 
-	private void removeAssignedSplit(String splitId) {
-		assignedSplits.remove(splitId);
-		LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits);
-
-	}
-
 	private void checkAndSetIdle() {
 		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
 		if (nowIdle) {
@@ -317,7 +288,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		}
 
 		@Override
-		public boolean run() throws InterruptedException {
+		public boolean run() {
 			return false;
 		}
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 999601a..ebbc0a6 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -32,10 +32,9 @@ public interface SplitFetcherTask {
 	 * invocation is needed.
 	 *
 	 * @return whether the runnable has successfully finished running.
-	 * @throws InterruptedException when interrupted.
 	 * @throws IOException when the performed I/O operation fails.
 	 */
-	boolean run() throws InterruptedException, IOException;
+	boolean run() throws IOException;
 
 	/**
 	 * Wake up the running thread.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index b980f7b..9114614 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -42,10 +42,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
 	 *
 	 * @return the Ids of the finished splits.
 	 *
-	 * @throws InterruptedException when interrupted
 	 * @throws IOException when encountered IO errors, such as deserialization failures.
 	 */
-	RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
+	RecordsWithSplitIds<E> fetch() throws IOException;
 
 	/**
 	 * Handle the split changes. This call should be non-blocking.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index c25490b..e5f5faf 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -157,7 +157,7 @@ public class SplitFetcherTest {
 	public void testWakeup() throws InterruptedException {
 		final int numSplits = 3;
 		final int numRecordsPerSplit = 10_000;
-		final int interruptRecordsInterval = 10;
+		final int wakeupRecordsInterval = 10;
 		final int numTotalRecords = numRecordsPerSplit * numSplits;
 
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
@@ -189,16 +189,16 @@ public class SplitFetcherTest {
 		// A thread waking up the split fetcher frequently.
 		AtomicInteger wakeupTimes = new AtomicInteger(0);
 		AtomicBoolean stop = new AtomicBoolean(false);
-		Thread interrupter = new Thread("Interrupter") {
+		Thread wakeUpCaller = new Thread("Wakeup Caller") {
 			@Override
 			public void run() {
-				int lastInterrupt = 0;
+				int lastWakeup = 0;
 				while (recordsRead.size() < numTotalRecords && !stop.get()) {
 					int numRecordsRead = recordsRead.size();
-					if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
+					if (numRecordsRead >= lastWakeup + wakeupRecordsInterval) {
 						fetcher.wakeUp(false);
 						wakeupTimes.incrementAndGet();
-						lastInterrupt = numRecordsRead;
+						lastWakeup = numRecordsRead;
 					}
 				}
 			}
@@ -206,7 +206,7 @@ public class SplitFetcherTest {
 
 		try {
 			fetcherThread.start();
-			interrupter.start();
+			wakeUpCaller.start();
 
 			while (recordsRead.size() < numSplits * numRecordsPerSplit) {
 				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
@@ -226,7 +226,7 @@ public class SplitFetcherTest {
 			stop.set(true);
 			fetcher.shutdown();
 			fetcherThread.join();
-			interrupter.join();
+			wakeUpCaller.join();
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 00d4d71..4cb738a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -43,7 +43,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	private final int numRecordsPerSplitPerFetch;
 	private final boolean blockingFetch;
 	private final boolean handleSplitsInOneShot;
-	private volatile Thread runningThread;
+
+	private final Object wakeupLock = new Object();
+	private volatile Thread threadInBlocking;
+	private boolean wokenUp;
 
 	public MockSplitReader(
 			int numRecordsPerSplitPerFetch,
@@ -52,14 +55,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 		this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
 		this.blockingFetch = blockingFetch;
 		this.handleSplitsInOneShot = handleSplitsInOneShot;
-		this.runningThread = null;
 	}
 
 	@Override
-	public RecordsWithSplitIds<int[]> fetch() throws InterruptedException {
-		if (runningThread == null) {
-			runningThread = Thread.currentThread();
-		}
+	public RecordsWithSplitIds<int[]> fetch() {
 		return getRecords();
 	}
 
@@ -75,14 +74,26 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 
 	@Override
 	public void wakeUp() {
-		if (blockingFetch && runningThread != null) {
-			runningThread.interrupt();
+		synchronized (wakeupLock) {
+			wokenUp = true;
+			if (threadInBlocking != null) {
+				threadInBlocking.interrupt();
+			}
 		}
 	}
 
 	private RecordsBySplits<int[]> getRecords() {
 		final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
 
+		// after this locked section, the thread might be interrupted
+		synchronized (wakeupLock) {
+			if (wokenUp) {
+				wokenUp = false;
+				return records.build();
+			}
+			threadInBlocking = Thread.currentThread();
+		}
+
 		try {
 			for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
 				MockSourceSplit split = entry.getValue();
@@ -102,7 +113,16 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 			if (!blockingFetch) {
 				throw new RuntimeException("Caught unexpected interrupted exception.");
 			}
+		} finally {
+			// after this locked section, the thread may not be interrupted any more
+			synchronized (wakeupLock) {
+				wokenUp = false;
+				//noinspection ResultOfMethodCallIgnored
+				Thread.interrupted();
+				threadInBlocking = null;
+			}
 		}
+
 		return records.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 0d202f7..5643088 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -42,13 +42,17 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 	}
 
 	@Override
-	public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException {
+	public RecordsWithSplitIds<E> fetch() throws IOException {
 		if (!fetches.isEmpty()) {
 			return fetches.removeFirst();
 		} else {
 			// block until woken up
 			synchronized (fetches) {
-				fetches.wait();
+				try {
+					fetches.wait();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
 				return null;
 			}
 		}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index dc8ce80..878d674 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -34,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * polled out of the queue since the creation of this split.
  */
 public class MockSourceSplit implements SourceSplit, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
 	private final int id;
 	private final BlockingQueue<Integer> records;
 	private final int endIndex;