You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/28 08:50:18 UTC
[flink] branch master updated: [FLINK-10192] [sql-client] Fix SQL
Client table visualization mode
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d78bb60 [FLINK-10192] [sql-client] Fix SQL Client table visualization mode
d78bb60 is described below
commit d78bb60715044077eb4267ad4b171616e94d90e3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Aug 24 12:13:45 2018 +0200
[FLINK-10192] [sql-client] Fix SQL Client table visualization mode
Fixes the wrong materialization for the debugging visualization
in table mode. Reworks the caching mechanism in MaterializedCollectStreamResult.
This closes #6617.
---
.../flink/table/client/gateway/TypedResult.java | 19 ++++
.../result/MaterializedCollectStreamResult.java | 51 +++++----
.../MaterializedCollectStreamResultTest.java | 114 +++++++++++++++++++++
3 files changed, 162 insertions(+), 22 deletions(-)
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
index ee4e8d3..6ef8ef3 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.client.gateway;
+import java.util.Objects;
+
/**
* Result with an attached type (actual payload, EOS, etc.).
*
@@ -55,6 +57,23 @@ public class TypedResult<P> {
return "TypedResult<" + type + ">";
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TypedResult<?> that = (TypedResult<?>) o;
+ return type == that.type && Objects.equals(payload, that.payload);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, payload);
+ }
+
// --------------------------------------------------------------------------------------------
public static <T> TypedResult<T> empty() {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 7321bd0..45c4f75 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -39,10 +39,20 @@ import java.util.Map;
public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
private final List<Row> materializedTable;
- private final Map<Row, List<Integer>> rowPositions; // positions of rows in table for faster access
+
+ /**
+ * Caches the last row position for faster access. The position might not be exact (if rows
+ * with smaller position are deleted) nor complete (for deletes of duplicates). However, the
+ * cache narrows the search in the materialized table.
+ */
+ private final Map<Row, Integer> rowPositionCache;
+
private final List<Row> snapshot;
+
private int pageCount;
+
private int pageSize;
+
private boolean isLastSnapshot;
public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
@@ -51,7 +61,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
// prepare for materialization
materializedTable = new ArrayList<>();
- rowPositions = new HashMap<>();
+ rowPositionCache = new HashMap<>();
snapshot = new ArrayList<>();
isLastSnapshot = false;
pageCount = 0;
@@ -101,32 +111,29 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
@Override
protected void processRecord(Tuple2<Boolean, Row> change) {
- // we track the position of rows for faster access and in order to return consistent
- // snapshots where new rows are appended at the end
synchronized (resultLock) {
- final List<Integer> positions = rowPositions.get(change.f1);
-
+ final Row row = change.f1;
// insert
if (change.f0) {
- materializedTable.add(change.f1);
- if (positions == null) {
- // new row
- final ArrayList<Integer> pos = new ArrayList<>(1);
- pos.add(materializedTable.size() - 1);
- rowPositions.put(change.f1, pos);
- } else {
- // row exists already, only add position
- positions.add(materializedTable.size() - 1);
- }
+ materializedTable.add(row);
+ rowPositionCache.put(row, materializedTable.size() - 1);
}
// delete
else {
- if (positions != null) {
- // delete row position and row itself
- final int pos = positions.remove(positions.size() - 1);
- materializedTable.remove(pos);
- if (positions.isEmpty()) {
- rowPositions.remove(change.f1);
+ // delete the newest record first to minimize per-page changes
+ final Integer cachedPos = rowPositionCache.get(row);
+ final int startSearchPos;
+ if (cachedPos != null) {
+ startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
+ } else {
+ startSearchPos = materializedTable.size() - 1;
+ }
+
+ for (int i = startSearchPos; i >= 0; i--) {
+ if (materializedTable.get(i).equals(row)) {
+ materializedTable.remove(i);
+ rowPositionCache.remove(row);
+ break;
}
}
}
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
new file mode 100644
index 0000000..c7e41ff
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MaterializedCollectStreamResult}.
+ */
+public class MaterializedCollectStreamResultTest {
+
+ @Test
+ public void testSnapshot() throws UnknownHostException {
+ final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
+
+ TestMaterializedCollectStreamResult result = null;
+ try {
+ result = new TestMaterializedCollectStreamResult(
+ type,
+ new ExecutionConfig(),
+ InetAddress.getLocalHost(),
+ 0);
+
+ result.isRetrieving = true;
+
+ result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("C", 2)));
+
+ assertEquals(TypedResult.payload(4), result.snapshot(1));
+
+ assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1));
+ assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
+ assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3));
+ assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4));
+
+ result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+ assertEquals(TypedResult.payload(3), result.snapshot(1));
+
+ assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1));
+ assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
+ assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3));
+
+ result.processRecord(Tuple2.of(false, Row.of("C", 2)));
+ result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+ assertEquals(TypedResult.payload(1), result.snapshot(1));
+
+ assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(1));
+ } finally {
+ if (result != null) {
+ result.close();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes
+ // --------------------------------------------------------------------------------------------
+
+ private static class TestMaterializedCollectStreamResult extends MaterializedCollectStreamResult {
+
+ public boolean isRetrieving;
+
+ public TestMaterializedCollectStreamResult(
+ TypeInformation<Row> outputType,
+ ExecutionConfig config,
+ InetAddress gatewayAddress,
+ int gatewayPort) {
+
+ super(
+ outputType,
+ config,
+ gatewayAddress,
+ gatewayPort);
+ }
+
+ @Override
+ protected boolean isRetrieving() {
+ return isRetrieving;
+ }
+ }
+}