You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/18 03:33:19 UTC
[flink] branch master updated: [FLINK-18272][table-runtime-blink]
Add retry logic to FileSystemLookupFunction
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 40621c7 [FLINK-18272][table-runtime-blink] Add retry logic to FileSystemLookupFunction
40621c7 is described below
commit 40621c733c1647c5745d2795eaa32b6cc7dae3ae
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jun 18 11:32:21 2020 +0800
[FLINK-18272][table-runtime-blink] Add retry logic to FileSystemLookupFunction
This closes #12651
---
.../table/filesystem/FileSystemLookupFunction.java | 57 +++++++++++++++-------
1 file changed, 39 insertions(+), 18 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index e3bed30..d8b8633 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -59,6 +59,11 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+ // the max number of retries before throwing exception, in case of failure to load the table into cache
+ private static final int MAX_RETRIES = 3;
+ // interval between retries
+ private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
private final InputFormat<RowData, T> inputFormat;
// names and types of the records returned by the input format
private final String[] producedNames;
@@ -143,26 +148,42 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
} else {
LOG.info("Populating lookup join cache");
}
- cache.clear();
- try {
- T[] inputSplits = inputFormat.createInputSplits(1);
- GenericRowData reuse = new GenericRowData(producedNames.length);
- long count = 0;
- for (T split : inputSplits) {
- inputFormat.open(split);
- while (!inputFormat.reachedEnd()) {
- RowData row = inputFormat.nextRecord(reuse);
- count++;
- Row key = extractKey(row);
- List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
- rows.add(serializer.copy(row));
+ int numRetry = 0;
+ while (true) {
+ cache.clear();
+ try {
+ T[] inputSplits = inputFormat.createInputSplits(1);
+ GenericRowData reuse = new GenericRowData(producedNames.length);
+ long count = 0;
+ for (T split : inputSplits) {
+ inputFormat.open(split);
+ while (!inputFormat.reachedEnd()) {
+ RowData row = inputFormat.nextRecord(reuse);
+ count++;
+ Row key = extractKey(row);
+ List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
+ rows.add(serializer.copy(row));
+ }
+ inputFormat.close();
+ }
+ nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
+ LOG.info("Loaded {} row(s) into lookup join cache", count);
+ return;
+ } catch (IOException e) {
+ if (numRetry >= MAX_RETRIES) {
+ throw new FlinkRuntimeException(
+ String.format("Failed to load table into cache after %d retries", numRetry), e);
+ }
+ numRetry++;
+ long toSleep = numRetry * RETRY_INTERVAL.toMillis();
+ LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", toSleep / 1000), e);
+ try {
+ Thread.sleep(toSleep);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
+ throw new FlinkRuntimeException(ex);
}
- inputFormat.close();
}
- nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
- LOG.info("Loaded {} row(s) into lookup join cache", count);
- } catch (IOException e) {
- throw new FlinkRuntimeException("Failed to load table into cache", e);
}
}