You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/08/11 08:30:47 UTC

[flink] branch master updated: [FLINK-18848][python] Fix to_pandas to handle retraction data properly

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2333d48  [FLINK-18848][python] Fix to_pandas to handle retraction data properly
2333d48 is described below

commit 2333d48a0638299f6dcb1b7d866b0e9e37939f4c
Author: Dian Fu <di...@apache.org>
AuthorDate: Mon Aug 10 14:03:13 2020 +0800

    [FLINK-18848][python] Fix to_pandas to handle retraction data properly
    
    This closes #13099.
---
 docs/dev/table/python/conversion_of_pandas.md      |  3 +-
 docs/dev/table/python/conversion_of_pandas.zh.md   |  3 +-
 flink-python/pyflink/table/table.py                |  4 +-
 .../pyflink/table/tests/test_pandas_conversion.py  | 14 ++++-
 .../flink/table/runtime/arrow/ArrowUtils.java      | 70 ++++++++++++++++++++--
 5 files changed, 85 insertions(+), 9 deletions(-)

diff --git a/docs/dev/table/python/conversion_of_pandas.md b/docs/dev/table/python/conversion_of_pandas.md
index c772a73..b3de762 100644
--- a/docs/dev/table/python/conversion_of_pandas.md
+++ b/docs/dev/table/python/conversion_of_pandas.md
@@ -63,7 +63,8 @@ table = t_env.from_pandas(pdf,
 It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the 
 table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size
 is determined by the config option [python.fn-execution.arrow.batch.size]({{ site.baseurl }}/dev/table/python/python_config.html#python-fn-execution-arrow-batch-size).
-The serialized data will then be converted to Pandas DataFrame.
+The serialized data will then be converted to Pandas DataFrame. It will collect the content of the table to
+the client side and so please make sure that the content of the table could fit in memory before calling this method.
 
 The following example shows how to convert a PyFlink Table to a Pandas DataFrame:
 
diff --git a/docs/dev/table/python/conversion_of_pandas.zh.md b/docs/dev/table/python/conversion_of_pandas.zh.md
index 38684a7..1fa95c5 100644
--- a/docs/dev/table/python/conversion_of_pandas.zh.md
+++ b/docs/dev/table/python/conversion_of_pandas.zh.md
@@ -63,7 +63,8 @@ table = t_env.from_pandas(pdf,
 It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the 
 table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size
 is determined by the config option [python.fn-execution.arrow.batch.size]({{ site.baseurl }}/zh/dev/table/python/python_config.html#python-fn-execution-arrow-batch-size).
-The serialized data will then be converted to Pandas DataFrame.
+The serialized data will then be converted to Pandas DataFrame. It will collect the content of the table to
+the client side and so please make sure that the content of the table could fit in memory before calling this method.
 
 The following example shows how to convert a PyFlink Table to a Pandas DataFrame:
 
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 2f55680..80937c3 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -707,7 +707,9 @@ class Table(object):
 
     def to_pandas(self):
         """
-        Converts the table to a pandas DataFrame.
+        Converts the table to a pandas DataFrame. It will collect the content of the table to
+        the client side and so please make sure that the content of the table could fit in memory
+        before calling this method.
 
         Example:
         ::
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 5bb20f8..ffbc04e 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -37,7 +37,7 @@ class PandasConversionTestBase(object):
                      datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
                      Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
                          d=[1, 2])),
-                    (2, 2, 2, 2, False, 2.1, 2.2, 'world', bytearray(b"bbb"),
+                    (1, 2, 2, 2, False, 2.1, 2.2, 'world', bytearray(b"bbb"),
                      decimal.Decimal('1000000000000000000.02'), datetime.date(2014, 9, 13),
                      datetime.time(hour=1, minute=0, second=1),
                      datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
@@ -118,7 +118,7 @@ class PandasConversionITTests(PandasConversionTestBase):
         table = self.t_env.from_pandas(self.pdf, self.data_type, 5)
         self.assertEqual(self.data_type, table.get_schema().to_row_data_type())
 
-        table = table.filter("f1 < 2")
+        table = table.filter("f2 < 2")
         table_sink = source_sink_utils.TestAppendSink(
             self.data_type.field_names(),
             self.data_type.field_types())
@@ -143,6 +143,16 @@ class PandasConversionITTests(PandasConversionTestBase):
         pdf = table.filter("f1 < 0").to_pandas()
         self.assertTrue(pdf.empty)
 
+    def test_to_pandas_for_retract_table(self):
+        table = self.t_env.from_pandas(self.pdf, self.data_type)
+        result_pdf = table.group_by("f1").select("max(f2) as f2").to_pandas()
+        import pandas as pd
+        import numpy as np
+        assert_frame_equal(result_pdf, pd.DataFrame(data={'f2': np.int16([2])}))
+
+        result_pdf = table.group_by("f2").select("max(f1) as f2").to_pandas()
+        assert_frame_equal(result_pdf, pd.DataFrame(data={'f2': np.int8([1, 1])}))
+
 
 class StreamPandasConversionTests(PandasConversionITTests,
                                   PyFlinkStreamTableTestCase):
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index aa2fcb3..e286c18 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -23,6 +23,9 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.BatchTableEnvImpl;
 import org.apache.flink.table.api.internal.TableEnvImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableImpl;
@@ -121,6 +124,9 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.LinkedHashMultiset;
 
 import org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.arrow.memory.BufferAllocator;
@@ -611,6 +617,21 @@ public final class ArrowUtils {
 	 */
 	public static CustomIterator<byte[]> collectAsPandasDataFrame(Table table, int maxArrowBatchSize) throws Exception {
 		checkArrowUsable();
+		boolean isRetractTable = false;
+		if (isStreamingMode(table)) {
+			StreamTableEnvironment tableEnv = (StreamTableEnvironment) ((TableImpl) table).getTableEnvironment();
+			try {
+				tableEnv.toAppendStream(table, Row.class);
+			} catch (Throwable t) {
+				if (t.getMessage().contains("toAppendStream doesn't support consuming update changes") ||
+						t.getMessage().contains("Table is not an append-only table")) {
+					isRetractTable = true;
+				} else {
+					throw new RuntimeException("Failed to determine whether the given table is append only.", t);
+				}
+			}
+		}
+
 		BufferAllocator allocator = getRootAllocator().newChildAllocator("collectAsPandasDataFrame", 0, Long.MAX_VALUE);
 		RowType rowType = (RowType) table.getSchema().toRowDataType().getLogicalType();
 		VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
@@ -620,13 +641,20 @@ public final class ArrowUtils {
 
 		ArrowWriter arrowWriter;
 		Iterator<Row> results = table.execute().collect();
+		Iterator<Row> appendOnlyResults;
+		if (isRetractTable) {
+			appendOnlyResults = filterOutRetractRows(results);
+		} else {
+			appendOnlyResults = results;
+		}
+
 		Iterator convertedResults;
 		if (isBlinkPlanner(table)) {
 			arrowWriter = createRowDataArrowWriter(root, rowType);
 			convertedResults = new Iterator<RowData>() {
 				@Override
 				public boolean hasNext() {
-					return results.hasNext();
+					return appendOnlyResults.hasNext();
 				}
 
 				@Override
@@ -637,12 +665,12 @@ public final class ArrowUtils {
 						SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema());
 					DataFormatConverters.DataFormatConverter converter =
 						DataFormatConverters.getConverterForDataType(convertedTableSchema.toRowDataType());
-					return (RowData) converter.toInternal(results.next());
+					return (RowData) converter.toInternal(appendOnlyResults.next());
 				}
 			};
 		} else {
 			arrowWriter = createRowArrowWriter(root, rowType);
-			convertedResults = results;
+			convertedResults = appendOnlyResults;
 		}
 
 		return new CustomIterator<byte[]>() {
@@ -679,6 +707,24 @@ public final class ArrowUtils {
 		};
 	}
 
+	private static Iterator<Row> filterOutRetractRows(Iterator<Row> data) {
+		LinkedHashMultiset<Row> result = LinkedHashMultiset.create();
+		while (data.hasNext()) {
+			Row element = data.next();
+			if (element.getKind() == RowKind.INSERT || element.getKind() == RowKind.UPDATE_AFTER) {
+				element.setKind(RowKind.INSERT);
+				result.add(element);
+			} else {
+				element.setKind(RowKind.INSERT);
+				if (!result.remove(element)) {
+					throw new RuntimeException(
+						String.format("Could not remove element '%s', should never happen.", element));
+				}
+			}
+		}
+		return result.iterator();
+	}
+
 	private static boolean isBlinkPlanner(Table table) {
 		TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
 		if (tableEnv instanceof TableEnvImpl) {
@@ -688,7 +734,23 @@ public final class ArrowUtils {
 			return planner instanceof PlannerBase;
 		} else {
 			throw new RuntimeException(String.format(
-				"Could not determine the planner type for table environment class %s", tableEnv.getClass()));
+				"Could not determine the planner type for table environment class %s.", tableEnv.getClass()));
+		}
+	}
+
+	private static boolean isStreamingMode(Table table) throws Exception {
+		TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
+		if (tableEnv instanceof BatchTableEnvironment || tableEnv instanceof BatchTableEnvImpl) {
+			return false;
+		} else if (tableEnv instanceof StreamTableEnvironment) {
+			return true;
+		} else if (tableEnv instanceof TableEnvironmentImpl) {
+			java.lang.reflect.Field isStreamingModeMethod = TableEnvironmentImpl.class.getDeclaredField("isStreamingMode");
+			isStreamingModeMethod.setAccessible(true);
+			return (boolean) isStreamingModeMethod.get(tableEnv);
+		} else {
+			throw new RuntimeException(String.format(
+				"Could not determine the streaming mode for table environment class %s", tableEnv.getClass()));
 		}
 	}