You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/28 08:50:18 UTC

[flink] branch master updated: [FLINK-10192] [sql-client] Fix SQL Client table visualization mode

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d78bb60  [FLINK-10192] [sql-client] Fix SQL Client table visualization mode
d78bb60 is described below

commit d78bb60715044077eb4267ad4b171616e94d90e3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Aug 24 12:13:45 2018 +0200

    [FLINK-10192] [sql-client] Fix SQL Client table visualization mode
    
    Fixes the wrong materialization for the debugging visualization
    in table mode. Reworks the caching mechanism in MaterializedCollectStreamResult.
    
    This closes #6617.
---
 .../flink/table/client/gateway/TypedResult.java    |  19 ++++
 .../result/MaterializedCollectStreamResult.java    |  51 +++++----
 .../MaterializedCollectStreamResultTest.java       | 114 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 22 deletions(-)

diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
index ee4e8d3..6ef8ef3 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.gateway;
 
+import java.util.Objects;
+
 /**
  * Result with an attached type (actual payload, EOS, etc.).
  *
@@ -55,6 +57,23 @@ public class TypedResult<P> {
 		return "TypedResult<" + type + ">";
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TypedResult<?> that = (TypedResult<?>) o;
+		return type == that.type && Objects.equals(payload, that.payload);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(type, payload);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static <T> TypedResult<T> empty() {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 7321bd0..45c4f75 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -39,10 +39,20 @@ import java.util.Map;
 public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
 
 	private final List<Row> materializedTable;
-	private final Map<Row, List<Integer>> rowPositions; // positions of rows in table for faster access
+
+	/**
+	 * Caches the last row position for faster access. The position might not be exact (if rows
+	 * with smaller position are deleted) nor complete (for deletes of duplicates). However, the
+	 * cache narrows the search in the materialized table.
+	 */
+	private final Map<Row, Integer> rowPositionCache;
+
 	private final List<Row> snapshot;
+
 	private int pageCount;
+
 	private int pageSize;
+
 	private boolean isLastSnapshot;
 
 	public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
@@ -51,7 +61,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 
 		// prepare for materialization
 		materializedTable = new ArrayList<>();
-		rowPositions = new HashMap<>();
+		rowPositionCache = new HashMap<>();
 		snapshot = new ArrayList<>();
 		isLastSnapshot = false;
 		pageCount = 0;
@@ -101,32 +111,29 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 
 	@Override
 	protected void processRecord(Tuple2<Boolean, Row> change) {
-		// we track the position of rows for faster access and in order to return consistent
-		// snapshots where new rows are appended at the end
 		synchronized (resultLock) {
-			final List<Integer> positions = rowPositions.get(change.f1);
-
+			final Row row = change.f1;
 			// insert
 			if (change.f0) {
-				materializedTable.add(change.f1);
-				if (positions == null) {
-					// new row
-					final ArrayList<Integer> pos = new ArrayList<>(1);
-					pos.add(materializedTable.size() - 1);
-					rowPositions.put(change.f1, pos);
-				} else {
-					// row exists already, only add position
-					positions.add(materializedTable.size() - 1);
-				}
+				materializedTable.add(row);
+				rowPositionCache.put(row, materializedTable.size() - 1);
 			}
 			// delete
 			else {
-				if (positions != null) {
-					// delete row position and row itself
-					final int pos = positions.remove(positions.size() - 1);
-					materializedTable.remove(pos);
-					if (positions.isEmpty()) {
-						rowPositions.remove(change.f1);
+				// delete the newest record first to minimize per-page changes
+				final Integer cachedPos = rowPositionCache.get(row);
+				final int startSearchPos;
+				if (cachedPos != null) {
+					startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
+				} else {
+					startSearchPos = materializedTable.size() - 1;
+				}
+
+				for (int i = startSearchPos; i >= 0; i--) {
+					if (materializedTable.get(i).equals(row)) {
+						materializedTable.remove(i);
+						rowPositionCache.remove(row);
+						break;
 					}
 				}
 			}
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
new file mode 100644
index 0000000..c7e41ff
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MaterializedCollectStreamResult}.
+ */
+public class MaterializedCollectStreamResultTest {
+
+	@Test
+	public void testSnapshot() throws UnknownHostException {
+		final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
+
+		TestMaterializedCollectStreamResult result = null;
+		try {
+			result = new TestMaterializedCollectStreamResult(
+				type,
+				new ExecutionConfig(),
+				InetAddress.getLocalHost(),
+				0);
+
+			result.isRetrieving = true;
+
+			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+			result.processRecord(Tuple2.of(true, Row.of("C", 2)));
+
+			assertEquals(TypedResult.payload(4), result.snapshot(1));
+
+			assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1));
+			assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
+			assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3));
+			assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4));
+
+			result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+			assertEquals(TypedResult.payload(3), result.snapshot(1));
+
+			assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1));
+			assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
+			assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3));
+
+			result.processRecord(Tuple2.of(false, Row.of("C", 2)));
+			result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+			assertEquals(TypedResult.payload(1), result.snapshot(1));
+
+			assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(1));
+		} finally {
+			if (result != null) {
+				result.close();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestMaterializedCollectStreamResult extends MaterializedCollectStreamResult {
+
+		public boolean isRetrieving;
+
+		public TestMaterializedCollectStreamResult(
+				TypeInformation<Row> outputType,
+				ExecutionConfig config,
+				InetAddress gatewayAddress,
+				int gatewayPort) {
+
+			super(
+				outputType,
+				config,
+				gatewayAddress,
+				gatewayPort);
+		}
+
+		@Override
+		protected boolean isRetrieving() {
+			return isRetrieving;
+		}
+	}
+}