You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/18 03:33:19 UTC

[flink] branch master updated: [FLINK-18272][table-runtime-blink] Add retry logic to FileSystemLookupFunction

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 40621c7  [FLINK-18272][table-runtime-blink] Add retry logic to FileSystemLookupFunction
40621c7 is described below

commit 40621c733c1647c5745d2795eaa32b6cc7dae3ae
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jun 18 11:32:21 2020 +0800

    [FLINK-18272][table-runtime-blink] Add retry logic to FileSystemLookupFunction
    
    This closes #12651
---
 .../table/filesystem/FileSystemLookupFunction.java | 57 +++++++++++++++-------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index e3bed30..d8b8633 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -59,6 +59,11 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
 
+	// the max number of retries before throwing exception, in case of failure to load the table into cache
+	private static final int MAX_RETRIES = 3;
+	// interval between retries
+	private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
 	private final InputFormat<RowData, T> inputFormat;
 	// names and types of the records returned by the input format
 	private final String[] producedNames;
@@ -143,26 +148,42 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
 		} else {
 			LOG.info("Populating lookup join cache");
 		}
-		cache.clear();
-		try {
-			T[] inputSplits = inputFormat.createInputSplits(1);
-			GenericRowData reuse = new GenericRowData(producedNames.length);
-			long count = 0;
-			for (T split : inputSplits) {
-				inputFormat.open(split);
-				while (!inputFormat.reachedEnd()) {
-					RowData row = inputFormat.nextRecord(reuse);
-					count++;
-					Row key = extractKey(row);
-					List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
-					rows.add(serializer.copy(row));
+		int numRetry = 0;
+		while (true) {
+			cache.clear();
+			try {
+				T[] inputSplits = inputFormat.createInputSplits(1);
+				GenericRowData reuse = new GenericRowData(producedNames.length);
+				long count = 0;
+				for (T split : inputSplits) {
+					inputFormat.open(split);
+					while (!inputFormat.reachedEnd()) {
+						RowData row = inputFormat.nextRecord(reuse);
+						count++;
+						Row key = extractKey(row);
+						List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
+						rows.add(serializer.copy(row));
+					}
+					inputFormat.close();
+				}
+				nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
+				LOG.info("Loaded {} row(s) into lookup join cache", count);
+				return;
+			} catch (IOException e) {
+				if (numRetry >= MAX_RETRIES) {
+					throw new FlinkRuntimeException(
+							String.format("Failed to load table into cache after %d retries", numRetry), e);
+				}
+				numRetry++;
+				long toSleep = numRetry * RETRY_INTERVAL.toMillis();
+				LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", toSleep / 1000), e);
+				try {
+					Thread.sleep(toSleep);
+				} catch (InterruptedException ex) {
+					LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
+					throw new FlinkRuntimeException(ex);
 				}
-				inputFormat.close();
 			}
-			nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
-			LOG.info("Loaded {} row(s) into lookup join cache", count);
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Failed to load table into cache", e);
 		}
 	}