You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/29 10:45:53 UTC

[GitHub] asfgit closed pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

asfgit closed pull request #6621:  [FLINK-8686] [sql-client] Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index f16f1a5561a..8c4ba83c6dc 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -106,7 +106,7 @@ Alice, 1
 Greg, 1
 {% endhighlight %}
 
-Both result modes can be useful during the prototyping of SQL queries.
+Both result modes can be useful during the prototyping of SQL queries. In both modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured [maximum number of rows](sqlClient.html#configuration) (`max-table-result-rows`).
 
 <span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.
 
@@ -167,6 +167,7 @@ Every environment file is a regular [YAML file](http://yaml.org/). An example of
 tables:
   - name: MyTableSource
     type: source
+    update-mode: append
     connector:
       type: filesystem
       path: "/path/to/something.csv"
@@ -206,6 +207,8 @@ functions:
 execution:
   type: streaming                   # required: execution mode either 'batch' or 'streaming'
   result-mode: table                # required: either 'table' or 'changelog'
+  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
+                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
   time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
   parallelism: 1                    # optional: Flink's parallelism (1 by default)
   periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
@@ -213,7 +216,7 @@ execution:
   min-idle-state-retention: 0       # optional: table program's minimum idle state time
   max-idle-state-retention: 0       # optional: table program's maximum idle state time
   restart-strategy:                 # optional: restart strategy
-    type: fallback                  #           "fallback" to global restart strategy by default
+    type: fallback                  #   "fallback" to global restart strategy by default
 
 # Deployment properties allow for describing the cluster to which table programs are submitted to.
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 302651a78aa..97e89fd6459 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -76,7 +76,9 @@ execution:
   # interval in ms for emitting periodic watermarks
   periodic-watermarks-interval: 200
   # 'changelog' or 'table' presentation of results
-  result-mode: changelog
+  result-mode: table
+  # maximum number of maintained rows in 'table' presentation of results
+  max-table-result-rows: 1000000
   # parallelism of the program
   parallelism: 1
   # maximum parallelism
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
index cf17933d977..d70af807a30 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
@@ -34,6 +34,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
@@ -50,6 +51,7 @@
  */
 public class CliChangelogResultView extends CliResultView<CliChangelogResultView.ResultChangelogOperation> {
 
+	private static final int DEFAULT_MAX_ROW_COUNT = 1000;
 	private static final int DEFAULT_REFRESH_INTERVAL = 0; // as fast as possible
 	private static final int DEFAULT_REFRESH_INTERVAL_PLAIN = 3; // every 1s
 	private static final int MIN_REFRESH_INTERVAL = 0; // every 100ms
@@ -66,7 +68,8 @@ public CliChangelogResultView(CliClient client, ResultDescriptor resultDescripto
 			refreshInterval = DEFAULT_REFRESH_INTERVAL;
 		}
 		previousResults = null;
-		results = new ArrayList<>();
+		// rows are always appended at the tail and deleted from the head of the list
+		results = new LinkedList<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -133,6 +136,13 @@ protected void refresh() {
 					}
 
 					// update results
+
+					// formatting and printing of rows is expensive in the current implementation,
+					// therefore we limit the maximum number of lines shown in changelog mode to
+					// keep the CLI responsive
+					if (results.size() >= DEFAULT_MAX_ROW_COUNT) {
+						results.remove(0);
+					}
 					results.add(changeRow);
 
 					scrolling++;
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index df42edd63b3..78f3a565554 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -28,7 +28,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.IntStream;
 
 import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
 
@@ -182,9 +181,8 @@ protected void init() {
 	protected List<AttributedString> computeMainLines() {
 		final List<AttributedString> lines = new ArrayList<>();
 
-		IntStream.range(0, results.size()).forEach(lineIdx -> {
-			final String[] line = results.get(lineIdx);
-
+		int lineIdx = 0;
+		for (String[] line : results) {
 			final AttributedStringBuilder row = new AttributedStringBuilder();
 
 			// highlight selected row
@@ -192,7 +190,7 @@ protected void init() {
 				row.style(AttributedStyle.DEFAULT.inverse());
 			}
 
-			IntStream.range(0, line.length).forEach(colIdx -> {
+			for (int colIdx = 0; colIdx < line.length; colIdx++) {
 				final String col = line[colIdx];
 				final int columnWidth = computeColumnWidth(colIdx);
 
@@ -208,9 +206,11 @@ protected void init() {
 				} else {
 					normalizeColumn(row, col, columnWidth);
 				}
-			});
+			}
 			lines.add(row.toAttributedString());
-		});
+
+			lineIdx++;
+		}
 
 		return lines;
 	}
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index b7c28938121..0518dfce74d 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -92,6 +92,10 @@ public int getMaxParallelism() {
 		return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
 	}
 
+	public int getMaxTableResultRows() {
+		return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_TABLE_RESULT_ROWS, Integer.toString(1_000_000)));
+	}
+
 	public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
 		final String restartStrategy = properties.getOrDefault(
 			PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 2a6b001d5f4..2067c5a971b 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,8 @@ private PropertyStrings() {
 
 	public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
 
+	public static final String EXECUTION_MAX_TABLE_RESULT_ROWS = "max-table-result-rows";
+
 	public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";
 
 	public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index a54160f24b8..93fdd483d44 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -71,7 +71,12 @@ public ResultStore(Configuration flinkConfig) {
 			if (env.getExecution().isChangelogMode()) {
 				return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
 			} else {
-				return new MaterializedCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+				return new MaterializedCollectStreamResult<>(
+					outputType,
+					config,
+					gatewayAddress,
+					gatewayPort,
+					env.getExecution().getMaxTableResultRows());
 			}
 
 		} else {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 45c4f7553c6..0beabe94d2c 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.gateway.local.result;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,31 @@
  */
 public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
 
+	/** Maximum initial capacity of the materialized table. */
+	public static final int MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY = 1_000_000;
+
+	/** Maximum overcommitment of the materialized table. */
+	public static final int MATERIALIZED_TABLE_MAX_OVERCOMMIT = 1_000_000;
+
+	/** Factor for the initial capacity of the materialized table. */
+	public static final double MATERIALIZED_TABLE_CAPACITY_FACTOR = 0.05;
+
+	/** Factor for cleaning up deleted rows in the materialized table. */
+	public static final double MATERIALIZED_TABLE_OVERCOMMIT_FACTOR = 0.01;
+
+	/**
+	 * Maximum number of materialized rows to be stored. After the count is reached, oldest
+	 * rows are dropped.
+	 */
+	private final int maxRowCount;
+
+	/** Threshold for cleaning up deleted rows in the materialized table. */
+	private final int overcommitThreshold;
+
+	/**
+	 * Materialized table that is continuously updated by inserts and deletes. Deletes at
+	 * the beginning are lazily cleaned up when the threshold is reached.
+	 */
 	private final List<Row> materializedTable;
 
 	/**
@@ -47,26 +73,65 @@
 	 */
 	private final Map<Row, Integer> rowPositionCache;
 
+	/** Current snapshot of the materialized table. */
 	private final List<Row> snapshot;
 
+	/** Counter for deleted rows to be deleted at the beginning of the materialized table. */
+	private int validRowPosition;
+
+	/** Page count of the snapshot (always >= 1). */
 	private int pageCount;
 
+	/** Page size of the snapshot (always >= 1). */
 	private int pageSize;
 
+	/** Indicator that this is the last snapshot possible (EOS afterwards). */
 	private boolean isLastSnapshot;
 
-	public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
-			InetAddress gatewayAddress, int gatewayPort) {
+	@VisibleForTesting
+	public MaterializedCollectStreamResult(
+			TypeInformation<Row> outputType,
+			ExecutionConfig config,
+			InetAddress gatewayAddress,
+			int gatewayPort,
+			int maxRowCount,
+			int overcommitThreshold) {
 		super(outputType, config, gatewayAddress, gatewayPort);
 
+		if (maxRowCount <= 0) {
+			this.maxRowCount = Integer.MAX_VALUE;
+		} else {
+			this.maxRowCount = maxRowCount;
+		}
+
+		this.overcommitThreshold = overcommitThreshold;
+
 		// prepare for materialization
-		materializedTable = new ArrayList<>();
-		rowPositionCache = new HashMap<>();
+		final int initialCapacity = computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
+		materializedTable = new ArrayList<>(initialCapacity);
+		rowPositionCache = new HashMap<>(initialCapacity);
 		snapshot = new ArrayList<>();
+		validRowPosition = 0;
 		isLastSnapshot = false;
 		pageCount = 0;
 	}
 
+	public MaterializedCollectStreamResult(
+			TypeInformation<Row> outputType,
+			ExecutionConfig config,
+			InetAddress gatewayAddress,
+			int gatewayPort,
+			int maxRowCount) {
+
+		this(
+			outputType,
+			config,
+			gatewayAddress,
+			gatewayPort,
+			maxRowCount,
+			computeMaterializedTableOvercommit(maxRowCount));
+	}
+
 	@Override
 	public boolean isMaterialized() {
 		return true;
@@ -74,6 +139,10 @@ public boolean isMaterialized() {
 
 	@Override
 	public TypedResult<Integer> snapshot(int pageSize) {
+		if (pageSize < 1) {
+			throw new SqlExecutionException("Page size must be greater than 0.");
+		}
+
 		synchronized (resultLock) {
 			// retrieval thread is dead and there are no results anymore
 			// or program failed
@@ -87,7 +156,9 @@ else if (!isRetrieving()) {
 
 			this.pageSize = pageSize;
 			snapshot.clear();
-			snapshot.addAll(materializedTable);
+			for (int i = validRowPosition; i < materializedTable.size(); i++) {
+				snapshot.add(materializedTable.get(i));
+			}
 
 			// at least one page
 			pageCount = Math.max(1, (int) Math.ceil(((double) snapshot.size() / pageSize)));
@@ -112,31 +183,82 @@ else if (!isRetrieving()) {
 	@Override
 	protected void processRecord(Tuple2<Boolean, Row> change) {
 		synchronized (resultLock) {
-			final Row row = change.f1;
 			// insert
 			if (change.f0) {
-				materializedTable.add(row);
-				rowPositionCache.put(row, materializedTable.size() - 1);
+				processInsert(change.f1);
 			}
 			// delete
 			else {
-				// delete the newest record first to minimize per-page changes
-				final Integer cachedPos = rowPositionCache.get(row);
-				final int startSearchPos;
-				if (cachedPos != null) {
-					startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
-				} else {
-					startSearchPos = materializedTable.size() - 1;
-				}
-
-				for (int i = startSearchPos; i >= 0; i--) {
-					if (materializedTable.get(i).equals(row)) {
-						materializedTable.remove(i);
-						rowPositionCache.remove(row);
-						break;
-					}
-				}
+				processDelete(change.f1);
 			}
 		}
 	}
+
+	@VisibleForTesting
+	protected List<Row> getMaterializedTable() {
+		return materializedTable;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void processInsert(Row row) {
+		// limit the materialized table
+		if (materializedTable.size() - validRowPosition >= maxRowCount) {
+			cleanUp();
+		}
+		materializedTable.add(row);
+		rowPositionCache.put(row, materializedTable.size() - 1);
+	}
+
+	private void processDelete(Row row) {
+		// delete the newest record first to minimize per-page changes
+		final Integer cachedPos = rowPositionCache.get(row);
+		final int startSearchPos;
+		if (cachedPos != null) {
+			startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
+		} else {
+			startSearchPos = materializedTable.size() - 1;
+		}
+
+		for (int i = startSearchPos; i >= validRowPosition; i--) {
+			if (materializedTable.get(i).equals(row)) {
+				materializedTable.remove(i);
+				rowPositionCache.remove(row);
+				break;
+			}
+		}
+	}
+
+	private void cleanUp() {
+		// invalidate row
+		final Row deleteRow = materializedTable.get(validRowPosition);
+		if (rowPositionCache.get(deleteRow) == validRowPosition) {
+			// this row has no duplicates in the materialized table,
+			// it can be removed from the cache
+			rowPositionCache.remove(deleteRow);
+		}
+		materializedTable.set(validRowPosition, null);
+
+		validRowPosition++;
+
+		// perform clean up in batches
+		if (validRowPosition >= overcommitThreshold) {
+			materializedTable.subList(0, validRowPosition).clear();
+			// adjust all cached indexes
+			rowPositionCache.replaceAll((k, v) -> v - validRowPosition);
+			validRowPosition = 0;
+		}
+	}
+
+	private static int computeMaterializedTableCapacity(int maxRowCount) {
+		return Math.min(
+			MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY,
+			Math.max(1, (int) (maxRowCount * MATERIALIZED_TABLE_CAPACITY_FACTOR)));
+	}
+
+	private static int computeMaterializedTableOvercommit(int maxRowCount) {
+		return Math.min(
+			MATERIALIZED_TABLE_MAX_OVERCOMMIT,
+			(int) (maxRowCount * MATERIALIZED_TABLE_OVERCOMMIT_FACTOR));
+	}
 }
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 76648d08e1a..495f5e0d314 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -198,6 +198,7 @@ public void testGetSessionProperties() throws Exception {
 		expectedProperties.put("execution.max-idle-state-retention", "0");
 		expectedProperties.put("execution.min-idle-state-retention", "0");
 		expectedProperties.put("execution.result-mode", "table");
+		expectedProperties.put("execution.max-table-result-rows", "100");
 		expectedProperties.put("execution.restart-strategy.type", "failure-rate");
 		expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
 		expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
@@ -264,38 +265,47 @@ public void testStreamQueryExecutionChangelog() throws Exception {
 	public void testStreamQueryExecutionTable() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
+
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_0", url.getPath());
 		replaceVars.put("$VAR_1", "/");
 		replaceVars.put("$VAR_2", "streaming");
 		replaceVars.put("$VAR_3", "table");
 		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
 
-		final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
-		final SessionContext session = new SessionContext("test-session", new Environment());
+		final String query = "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1";
 
-		try {
-			// start job and retrieval
-			final ResultDescriptor desc = executor.executeQuery(
-				session,
-				"SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
+		final List<String> expectedResults = new ArrayList<>();
+		expectedResults.add("47,Hello World");
+		expectedResults.add("27,Hello World");
+		expectedResults.add("37,Hello World");
+		expectedResults.add("37,Hello World");
+		expectedResults.add("47,Hello World");
+		expectedResults.add("57,Hello World!!!!");
+
+		executeStreamQueryTable(replaceVars, query, expectedResults);
+	}
 
-			assertTrue(desc.isMaterialized());
+	@Test(timeout = 30_000L)
+	public void testStreamQueryExecutionLimitedTable() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
 
-			final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_0", url.getPath());
+		replaceVars.put("$VAR_1", "/");
+		replaceVars.put("$VAR_2", "streaming");
+		replaceVars.put("$VAR_3", "table");
+		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+		replaceVars.put("$VAR_MAX_ROWS", "1");
 
-			final List<String> expectedResults = new ArrayList<>();
-			expectedResults.add("47,Hello World");
-			expectedResults.add("27,Hello World");
-			expectedResults.add("37,Hello World");
-			expectedResults.add("37,Hello World");
-			expectedResults.add("47,Hello World");
-			expectedResults.add("57,Hello World!!!!");
+		final String query = "SELECT COUNT(*), StringField1 FROM TableNumber1 GROUP BY StringField1";
 
-			TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
-		} finally {
-			executor.stop(session);
-		}
+		final List<String> expectedResults = new ArrayList<>();
+		expectedResults.add("1,Hello World!!!!");
+
+		executeStreamQueryTable(replaceVars, query, expectedResults);
 	}
 
 	@Test(timeout = 30_000L)
@@ -375,6 +385,28 @@ public void testStreamQueryExecutionSink() throws Exception {
 		}
 	}
 
+	private void executeStreamQueryTable(
+			Map<String, String> replaceVars,
+			String query,
+			List<String> expectedResults) throws Exception {
+
+		final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		try {
+			// start job and retrieval
+			final ResultDescriptor desc = executor.executeQuery(session, query);
+
+			assertTrue(desc.isMaterialized());
+
+			final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
+
+			TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
+		} finally {
+			executor.stop(session);
+		}
+	}
+
 	private void verifySinkResult(String path) throws IOException {
 		final List<String> actualResults = new ArrayList<>();
 		TestBaseUtils.readAllResultLines(actualResults, path);
@@ -392,6 +424,7 @@ private void verifySinkResult(String path) throws IOException {
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_2", "batch");
 		replaceVars.put("$VAR_UPDATE_MODE", "");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
 		return new LocalExecutor(
 			EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
 			Collections.emptyList(),
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7e41ffe111..ba8c9245cc9 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -29,7 +29,9 @@
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -42,13 +44,14 @@
 	public void testSnapshot() throws UnknownHostException {
 		final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
 
-		TestMaterializedCollectStreamResult result = null;
+		TestMaterializedCollectStreamResult<?> result = null;
 		try {
-			result = new TestMaterializedCollectStreamResult(
+			result = new TestMaterializedCollectStreamResult<>(
 				type,
 				new ExecutionConfig(),
 				InetAddress.getLocalHost(),
-				0);
+				0,
+				Integer.MAX_VALUE);
 
 			result.isRetrieving = true;
 
@@ -85,11 +88,59 @@ public void testSnapshot() throws UnknownHostException {
 		}
 	}
 
+	@Test
+	public void testLimitedSnapshot() throws UnknownHostException {
+		final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
+
+		TestMaterializedCollectStreamResult<?> result = null;
+		try {
+			result = new TestMaterializedCollectStreamResult<>(
+				type,
+				new ExecutionConfig(),
+				InetAddress.getLocalHost(),
+				0,
+				2,  // limit the materialized table to 2 rows
+				3); // with 3 rows overcommitment
+
+			result.isRetrieving = true;
+
+			result.processRecord(Tuple2.of(true, Row.of("D", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+
+			assertEquals(
+				Arrays.asList(null, null, Row.of("B", 1), Row.of("A", 1)), // two over-committed rows
+				result.getMaterializedTable());
+
+			assertEquals(TypedResult.payload(2), result.snapshot(1));
+
+			assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(1));
+			assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(2));
+
+			result.processRecord(Tuple2.of(true, Row.of("C", 1)));
+
+			assertEquals(
+				Arrays.asList(Row.of("A", 1), Row.of("C", 1)), // limit clean up has taken place
+				result.getMaterializedTable());
+
+			result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+			assertEquals(
+				Collections.singletonList(Row.of("C", 1)), // regular clean up has taken place
+				result.getMaterializedTable());
+		} finally {
+			if (result != null) {
+				result.close();
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Helper classes
 	// --------------------------------------------------------------------------------------------
 
-	private static class TestMaterializedCollectStreamResult extends MaterializedCollectStreamResult {
+	private static class TestMaterializedCollectStreamResult<T> extends MaterializedCollectStreamResult<T> {
 
 		public boolean isRetrieving;
 
@@ -97,18 +148,42 @@ public TestMaterializedCollectStreamResult(
 				TypeInformation<Row> outputType,
 				ExecutionConfig config,
 				InetAddress gatewayAddress,
-				int gatewayPort) {
+				int gatewayPort,
+				int maxRowCount,
+				int overcommitThreshold) {
 
 			super(
 				outputType,
 				config,
 				gatewayAddress,
-				gatewayPort);
+				gatewayPort,
+				maxRowCount,
+				overcommitThreshold);
+		}
+
+		public TestMaterializedCollectStreamResult(
+				TypeInformation<Row> outputType,
+				ExecutionConfig config,
+				InetAddress gatewayAddress,
+				int gatewayPort,
+				int maxRowCount) {
+
+			super(
+				outputType,
+				config,
+				gatewayAddress,
+				gatewayPort,
+				maxRowCount);
 		}
 
 		@Override
 		protected boolean isRetrieving() {
 			return isRetrieving;
 		}
+
+		@Override
+		public List<Row> getMaterializedTable() {
+			return super.getMaterializedTable();
+		}
 	}
 }
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index cd5257e611c..e9c6d5bf32d 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -123,6 +123,7 @@ execution:
   min-idle-state-retention: 0
   max-idle-state-retention: 0
   result-mode: "$VAR_3"
+  max-table-result-rows: "$VAR_MAX_ROWS"
   restart-strategy:
     type: failure-rate
     max-failures-per-interval: 10


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services