You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:02 UTC
[flink] 01/04: [FLINK-17393][connectors] (follow-up) Wakeup the
SplitFetchers more elegantly.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 12261c6b7ed6478a9b9f6a69cb58246b83cab9b7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 17:30:28 2020 +0200
[FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly.
- Remove the InterruptedException from methods where we now do not expect an InterruptedException
to be thrown any more.
- Removes the code that was previously clearing the interruption flag for "benign interruptions".
- Adjust comments to reflect actual code model
- Remove unused method
- Adjust tests to not use arbitrary interruptions but local (in the wakeup scope only)
---
.../base/source/reader/fetcher/AddSplitsTask.java | 2 +-
.../base/source/reader/fetcher/FetchTask.java | 5 ++-
.../base/source/reader/fetcher/SplitFetcher.java | 45 ++++------------------
.../source/reader/fetcher/SplitFetcherTask.java | 3 +-
.../source/reader/splitreader/SplitReader.java | 3 +-
.../source/reader/fetcher/SplitFetcherTest.java | 14 +++----
.../base/source/reader/mocks/MockSplitReader.java | 36 +++++++++++++----
.../source/reader/mocks/TestingSplitReader.java | 8 +++-
.../connector/source/mocks/MockSourceSplit.java | 3 ++
9 files changed, 59 insertions(+), 60 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 19b15b5..82e529a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -50,7 +50,7 @@ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
}
@Override
- public boolean run() throws InterruptedException {
+ public boolean run() {
if (!splitsChangesAdded) {
splitsChanges.add(new SplitsAddition<>(splitsToAdd));
splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s));
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 530add1..743e763 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -52,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
}
@Override
- public boolean run() throws InterruptedException, IOException {
+ public boolean run() throws IOException {
try {
if (!isWakenUp() && lastRecords == null) {
lastRecords = splitReader.fetch();
@@ -67,6 +67,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
lastRecords = null;
}
}
+ } catch (InterruptedException e) {
+ // this should only happen on shutdown
+ throw new IOException("Source fetch execution was interrupted", e);
} finally {
// clean up the potential wakeup effect. It is possible that the fetcher is waken up
// after the clean up. In that case, either the wakeup flag will be set or the
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 3beb0da..db5a203 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -98,8 +97,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
runOnce();
}
} finally {
- // Reset the interrupted flag so the shutdown hook do not got interrupted.
- Thread.interrupted();
shutdownHook.run();
LOG.info("Split fetcher {} exited.", id);
}
@@ -116,38 +113,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
} else {
runningTask = taskQueue.take();
}
- // Now the running task is not null. If wakeUp() is called after this point, the fetcher
- // thread will not be interrupted. Instead task.wakeUp() will be called. On the other hand,
- // If the wakeUp() call was make before this point, the wakeUp flag must have already been
- // have been set, and the fetcher thread may or may not be interrupted, depending on
- // whether the wakeUp() call was before or after the runningTask assignment. So the
- // code does the following:
- // 1. check and clear the interrupt flag on the fetcher thread to avoid interruption in
- // later code.
- // 2. check the wakeUp flag to avoid unnecessary task run.
+ // Now the running task is not null. If wakeUp() is called after this point,
+ // task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before
+ // this point, the wakeUp flag must have already been set. The code hence checks the wakeUp
+ // flag first to avoid an unnecessary task run.
// Note that the runningTask may still encounter the case that the task is waken up before
// the it starts running.
LOG.debug("Prepare to run {}", runningTask);
- if (!Thread.interrupted() && !wakeUp.get() && runningTask.run()) {
+ if (!wakeUp.get() && runningTask.run()) {
LOG.debug("Finished running task {}", runningTask);
// the task has finished running. Set it to null so it won't be enqueued.
runningTask = null;
}
- } catch (InterruptedException ie) {
- if (closed.get()) {
- // The fetcher is closed, just return;
- return;
- } else if (wakeUp.get()) {
- // The fetcher thread has just been waken up. So ignore the interrupted exception
- // and continue;
- LOG.debug("Split fetcher has been waken up.");
- } else {
- throw new RuntimeException(String.format(
- "SplitFetcher thread %d interrupted while polling the records", id), ie);
- }
- } catch (IOException ioe) {
+ } catch (Exception e) {
throw new RuntimeException(String.format(
- "SplitFetcher thread %d received unexpected exception while polling the records", id), ioe);
+ "SplitFetcher thread %d received unexpected exception while polling the records", id), e);
}
// If the task is not null that means this task needs to be re-executed. This only
// happens when the task is the fetching task or the task was interrupted.
@@ -156,9 +136,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
- // Clean the interrupt flag in case the running task was interrupted after it finishes
- // running but before it was set to null.
- Thread.interrupted();
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
@@ -290,12 +267,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
return task != null && task != WAKEUP_TASK;
}
- private void removeAssignedSplit(String splitId) {
- assignedSplits.remove(splitId);
- LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits);
-
- }
-
private void checkAndSetIdle() {
final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
if (nowIdle) {
@@ -317,7 +288,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
}
@Override
- public boolean run() throws InterruptedException {
+ public boolean run() {
return false;
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 999601a..ebbc0a6 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -32,10 +32,9 @@ public interface SplitFetcherTask {
* invocation is needed.
*
* @return whether the runnable has successfully finished running.
- * @throws InterruptedException when interrupted.
* @throws IOException when the performed I/O operation fails.
*/
- boolean run() throws InterruptedException, IOException;
+ boolean run() throws IOException;
/**
* Wake up the running thread.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index b980f7b..9114614 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -42,10 +42,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
*
* @return the Ids of the finished splits.
*
- * @throws InterruptedException when interrupted
* @throws IOException when encountered IO errors, such as deserialization failures.
*/
- RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
+ RecordsWithSplitIds<E> fetch() throws IOException;
/**
* Handle the split changes. This call should be non-blocking.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index c25490b..e5f5faf 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -157,7 +157,7 @@ public class SplitFetcherTest {
public void testWakeup() throws InterruptedException {
final int numSplits = 3;
final int numRecordsPerSplit = 10_000;
- final int interruptRecordsInterval = 10;
+ final int wakeupRecordsInterval = 10;
final int numTotalRecords = numRecordsPerSplit * numSplits;
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
@@ -189,16 +189,16 @@ public class SplitFetcherTest {
// A thread waking up the split fetcher frequently.
AtomicInteger wakeupTimes = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
- Thread interrupter = new Thread("Interrupter") {
+ Thread wakeUpCaller = new Thread("Wakeup Caller") {
@Override
public void run() {
- int lastInterrupt = 0;
+ int lastWakeup = 0;
while (recordsRead.size() < numTotalRecords && !stop.get()) {
int numRecordsRead = recordsRead.size();
- if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
+ if (numRecordsRead >= lastWakeup + wakeupRecordsInterval) {
fetcher.wakeUp(false);
wakeupTimes.incrementAndGet();
- lastInterrupt = numRecordsRead;
+ lastWakeup = numRecordsRead;
}
}
}
@@ -206,7 +206,7 @@ public class SplitFetcherTest {
try {
fetcherThread.start();
- interrupter.start();
+ wakeUpCaller.start();
while (recordsRead.size() < numSplits * numRecordsPerSplit) {
final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
@@ -226,7 +226,7 @@ public class SplitFetcherTest {
stop.set(true);
fetcher.shutdown();
fetcherThread.join();
- interrupter.join();
+ wakeUpCaller.join();
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 00d4d71..4cb738a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -43,7 +43,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
private final int numRecordsPerSplitPerFetch;
private final boolean blockingFetch;
private final boolean handleSplitsInOneShot;
- private volatile Thread runningThread;
+
+ private final Object wakeupLock = new Object();
+ private volatile Thread threadInBlocking;
+ private boolean wokenUp;
public MockSplitReader(
int numRecordsPerSplitPerFetch,
@@ -52,14 +55,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
this.blockingFetch = blockingFetch;
this.handleSplitsInOneShot = handleSplitsInOneShot;
- this.runningThread = null;
}
@Override
- public RecordsWithSplitIds<int[]> fetch() throws InterruptedException {
- if (runningThread == null) {
- runningThread = Thread.currentThread();
- }
+ public RecordsWithSplitIds<int[]> fetch() {
return getRecords();
}
@@ -75,14 +74,26 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
@Override
public void wakeUp() {
- if (blockingFetch && runningThread != null) {
- runningThread.interrupt();
+ synchronized (wakeupLock) {
+ wokenUp = true;
+ if (threadInBlocking != null) {
+ threadInBlocking.interrupt();
+ }
}
}
private RecordsBySplits<int[]> getRecords() {
final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
+ // after this locked section, the thread might be interrupted
+ synchronized (wakeupLock) {
+ if (wokenUp) {
+ wokenUp = false;
+ return records.build();
+ }
+ threadInBlocking = Thread.currentThread();
+ }
+
try {
for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
MockSourceSplit split = entry.getValue();
@@ -102,7 +113,16 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
if (!blockingFetch) {
throw new RuntimeException("Caught unexpected interrupted exception.");
}
+ } finally {
+ // after this locked section, the thread may not be interrupted any more
+ synchronized (wakeupLock) {
+ wokenUp = false;
+ //noinspection ResultOfMethodCallIgnored
+ Thread.interrupted();
+ threadInBlocking = null;
+ }
}
+
return records.build();
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 0d202f7..5643088 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -42,13 +42,17 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
}
@Override
- public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException {
+ public RecordsWithSplitIds<E> fetch() throws IOException {
if (!fetches.isEmpty()) {
return fetches.removeFirst();
} else {
// block until woken up
synchronized (fetches) {
- fetches.wait();
+ try {
+ fetches.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
return null;
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index dc8ce80..878d674 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -34,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
* polled out of the queue since the creation of this split.
*/
public class MockSourceSplit implements SourceSplit, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
private final int id;
private final BlockingQueue<Integer> records;
private final int endIndex;