You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/23 15:03:26 UTC
[flink] branch release-1.11 updated: [FLINK-18399][table-api-java]
Fix TableResult#print can not print the result of unbounded stream query
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 8d9f6df [FLINK-18399][table-api-java] Fix TableResult#print can not print the result of unbounded stream query
8d9f6df is described below
commit 8d9f6df60d4c01d3410c763ff3508d50f78658ec
Author: godfrey he <go...@163.com>
AuthorDate: Tue Jun 23 23:02:11 2020 +0800
[FLINK-18399][table-api-java] Fix TableResult#print can not print the result of unbounded stream query
This closes #12752
---
flink-python/pyflink/table/table_result.py | 6 +-
.../table/client/cli/CliTableauResultView.java | 109 +-----------
.../table/client/cli/CliTableauResultViewTest.java | 2 +-
.../org/apache/flink/table/api/TableResult.java | 19 ++-
.../table/api/internal/TableEnvironmentImpl.java | 4 +-
.../flink/table/api/internal/TableResultImpl.java | 35 +++-
.../org/apache/flink/table/utils/PrintUtils.java | 185 ++++++++++++++++++---
.../apache/flink/table/utils/PrintUtilsTest.java | 100 +++++++++--
.../flink/table/api/internal/TableEnvImpl.scala | 5 +-
9 files changed, 305 insertions(+), 160 deletions(-)
diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
index 2dfb02f..e9f2abe 100644
--- a/flink-python/pyflink/table/table_result.py
+++ b/flink-python/pyflink/table/table_result.py
@@ -134,8 +134,10 @@ class TableResult(object):
"""
Print the result contents as tableau form to client console.
- NOTE: please make sure the result data to print should be small.
- Because all data will be collected to local first, and then print them to console.
+ For streaming mode, this method guarantees end-to-end exactly-once record delivery
+ which requires the checkpointing mechanism to be enabled.
+ By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+ (see ExecutionCheckpointingOptions) through `TableConfig#getConfiguration()`.
.. versionadded:: 1.11.0
"""
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
index dc81872..74126c7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
@@ -25,15 +25,6 @@ import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
@@ -54,7 +45,6 @@ import java.util.stream.Stream;
*/
public class CliTableauResultView implements AutoCloseable {
- private static final int NULL_COLUMN_WIDTH = CliStrings.NULL_COLUMN.length();
private static final int DEFAULT_COLUMN_WIDTH = 20;
private static final String CHANGEFLAG_COLUMN_NAME = "+/-";
@@ -183,11 +173,12 @@ public class CliTableauResultView implements AutoCloseable {
List<TableColumn> columns = resultDescriptor.getResultSchema().getTableColumns();
String[] fieldNames =
Stream.concat(
- Stream.of("+/-"),
+ Stream.of(CHANGEFLAG_COLUMN_NAME),
columns.stream().map(TableColumn::getName)
).toArray(String[]::new);
- int[] colWidths = columnWidthsByType(columns, true);
+ int[] colWidths = PrintUtils.columnWidthsByType(
+ columns, DEFAULT_COLUMN_WIDTH, CliStrings.NULL_COLUMN, CHANGEFLAG_COLUMN_NAME);
String borderline = PrintUtils.genBorderLine(colWidths);
// print filed names
@@ -227,98 +218,4 @@ public class CliTableauResultView implements AutoCloseable {
}
}
}
-
- /**
- * Try to infer column width based on column types. In streaming case, we will have an
- * endless result set, thus couldn't determine column widths based on column values.
- */
- private int[] columnWidthsByType(List<TableColumn> columns, boolean includeChangeflag) {
- // fill width with field names first
- int[] colWidths = columns.stream()
- .mapToInt(col -> col.getName().length())
- .toArray();
-
- // determine proper column width based on types
- for (int i = 0; i < columns.size(); ++i) {
- LogicalType type = columns.get(i).getType().getLogicalType();
- int len;
- switch (type.getTypeRoot()) {
- case TINYINT:
- len = TinyIntType.PRECISION + 1; // extra for negative value
- break;
- case SMALLINT:
- len = SmallIntType.PRECISION + 1; // extra for negative value
- break;
- case INTEGER:
- len = IntType.PRECISION + 1; // extra for negative value
- break;
- case BIGINT:
- len = BigIntType.PRECISION + 1; // extra for negative value
- break;
- case DECIMAL:
- len = ((DecimalType) type).getPrecision() + 2; // extra for negative value and decimal point
- break;
- case BOOLEAN:
- len = 5; // "true" or "false"
- break;
- case DATE:
- len = 10; // e.g. 9999-12-31
- break;
- case TIME_WITHOUT_TIME_ZONE:
- int precision = ((TimeType) type).getPrecision();
- len = precision == 0 ? 8 : precision + 9; // 23:59:59[.999999999]
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- precision = ((TimestampType) type).getPrecision();
- len = timestampTypeColumnWidth(precision);
- break;
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- precision = ((LocalZonedTimestampType) type).getPrecision();
- len = timestampTypeColumnWidth(precision);
- break;
- default:
- len = DEFAULT_COLUMN_WIDTH;
- }
-
- // adjust column width with potential null values
- colWidths[i] = Math.max(colWidths[i], Math.max(len, NULL_COLUMN_WIDTH));
- }
-
- // add an extra column for change flag if necessary
- if (includeChangeflag) {
- int[] ret = new int[columns.size() + 1];
- ret[0] = CHANGEFLAG_COLUMN_NAME.length();
- System.arraycopy(colWidths, 0, ret, 1, columns.size());
- return ret;
- } else {
- return colWidths;
- }
- }
-
- /**
- * Here we consider two popular class for timestamp: LocalDateTime and java.sql.Timestamp.
- *
- * <p>According to LocalDateTime's comment, the string output will be one of the following
- * ISO-8601 formats:
- * <li>{@code uuuu-MM-dd'T'HH:mm:ss}</li>
- * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSS}</li>
- * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSS}</li>
- * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS}</li>
- *
- * <p>And for java.sql.Timestamp, the number of digits after point will be precision except
- * when precision is 0. In that case, the format would be 'uuuu-MM-dd HH:mm:ss.0'
- */
- int timestampTypeColumnWidth(int precision) {
- int base = 19; // length of uuuu-MM-dd HH:mm:ss
- if (precision == 0) {
- return base + 2; // consider java.sql.Timestamp
- } else if (precision <= 3) {
- return base + 4;
- } else if (precision <= 6) {
- return base + 7;
- } else {
- return base + 10;
- }
- }
-
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
index 0a56458..1e7d7ef 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
@@ -310,7 +310,7 @@ public class CliTableauResultViewTest {
"| + | true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" + System.lineSeparator() +
"| - | false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |" + System.lineSeparator() +
"| + | true | 100 | -9223372036854775808 | abcdefg111 | (NULL) | 2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
- "| - | (NULL) | -1 | -1 | abcdefghijklmnop... | -12345.06789 | (NULL) |" + System.lineSeparator() +
+ "| - | (NULL) | -1 | -1 | abcdefghijklmnopq... | -12345.06789 | (NULL) |" + System.lineSeparator() +
"| + | (NULL) | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.0 |" + System.lineSeparator() +
"| - | (NULL) | -1 | -1 | これは日本語をテ... | -12345.06789 | 2020-03-04 18:39:14.0 |" + System.lineSeparator() +
"+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+" + System.lineSeparator() +
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
index f6c1a79..17b03cb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
@@ -131,14 +131,29 @@ public interface TableResult {
* it... // collect same data
* }
* }</pre>
+ *
+ * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
+ * which requires the checkpointing mechanism to be enabled.
+ * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+ * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+ *
+ * <p>In order to fetch result to local, you can call either {@link #collect()} and {@link #print()}.
+ * But, they can't be called both on the same {@link TableResult} instance,
+ * because the result can only be accessed once.
*/
CloseableIterator<Row> collect();
/**
* Print the result contents as tableau form to client console.
*
- * <p><strong>NOTE:</strong> please make sure the result data to print should be small.
- * Because all data will be collected to local first, and then print them to console.
+ * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
+ * which requires the checkpointing mechanism to be enabled.
+ * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+ * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+ *
+ * <p>In order to fetch result to local, you can call either {@link #collect()} and {@link #print()}.
+ * But, they can't be called both on the same {@link TableResult} instance,
+ * because the result can only be accessed once.
*/
void print();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 95deb43..08409b9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -735,7 +735,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
.tableSchema(tableSchema)
.data(tableSink.getResultIterator())
.setPrintStyle(TableResultImpl.PrintStyle.tableau(
- PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
+ PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, true))
.build();
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
@@ -1121,7 +1121,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
headers,
types).build())
.data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
- .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
+ .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "", false))
.build();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index 07cb9a0..9e2c035 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -94,8 +94,14 @@ class TableResultImpl implements TableResult {
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
+ boolean deriveColumnWidthByType = ((TableauStyle) printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm(
- getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn);
+ getTableSchema(),
+ it,
+ new PrintWriter(System.out),
+ maxColumnWidth,
+ nullColumn,
+ deriveColumnWidthByType);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
@@ -117,7 +123,7 @@ class TableResultImpl implements TableResult {
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
- private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
+ private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() {
}
@@ -198,13 +204,17 @@ class TableResultImpl implements TableResult {
*/
public interface PrintStyle {
/**
- * Create a tableau print style with given max column width and null column,
+ * Create a tableau print style with given max column width, null column,
+ * and a flag to indicate whether the column width is derived from type (true) or content (false),
* which prints the result schema and content as tableau form.
*/
- static PrintStyle tableau(int maxColumnWidth, String nullColumn) {
+ static PrintStyle tableau(
+ int maxColumnWidth,
+ String nullColumn,
+ boolean deriveColumnWidthByType) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
- return new TableauStyle(maxColumnWidth, nullColumn);
+ return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
}
/**
@@ -221,15 +231,26 @@ class TableResultImpl implements TableResult {
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
-
+ /**
+ * A flag to indicate whether the column width is derived from type (true) or content (false).
+ */
+ private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
- private TableauStyle(int maxColumnWidth, String nullColumn) {
+ private TableauStyle(
+ int maxColumnWidth,
+ String nullColumn,
+ boolean deriveColumnWidthByType) {
+ this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
}
+ public boolean isDeriveColumnWidthByType() {
+ return deriveColumnWidthByType;
+ }
+
int getMaxColumnWidth() {
return maxColumnWidth;
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
index 7916fa1..07e147d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
@@ -21,16 +21,28 @@ package org.apache.flink.table.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import com.ibm.icu.lang.UCharacter;
import com.ibm.icu.lang.UProperty;
+import javax.annotation.Nullable;
+
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Stream;
/**
* Utilities for print formatting.
@@ -65,13 +77,17 @@ public class PrintUtils {
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter) {
- printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN);
+ printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN, false);
}
/**
* Displays the result in a tableau form.
*
+ * <p><b>NOTE:</b> please make sure the data to print is small enough to be stored in java heap memory
+ * if the column width is derived from content (`deriveColumnWidthByType` is false).
+ *
* <p>For example:
+ * <pre>
* +-------------+---------+-------------+
* | boolean_col | int_col | varchar_col |
* +-------------+---------+-------------+
@@ -79,47 +95,72 @@ public class PrintUtils {
* | false | 2 | def |
* | (NULL) | (NULL) | (NULL) |
* +-------------+---------+-------------+
- * 3 rows in result
+ * </pre>
*
- * <p>Changelog is not supported until FLINK-16998 is finished.
+ * @param tableSchema The schema of the data to print
+ * @param it The iterator for the data to print
+ * @param printWriter The writer to write to
+ * @param maxColumnWidth The max width of a column
+ * @param nullColumn The string representation of a null value
+ * @param deriveColumnWidthByType A flag to indicate whether the column width
+ * is derived from type (true) or content (false).
*/
public static void printAsTableauForm(
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter,
int maxColumnWidth,
- String nullColumn) {
- List<String[]> rows = new ArrayList<>();
+ String nullColumn,
+ boolean deriveColumnWidthByType) {
+ final List<TableColumn> columns = tableSchema.getTableColumns();
+ final String[] columnNames = columns.stream().map(TableColumn::getName).toArray(String[]::new);
- // fill field names first
- List<TableColumn> columns = tableSchema.getTableColumns();
- rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
- while (it.hasNext()) {
- rows.add(rowToString(it.next(), nullColumn));
+ final int[] colWidths;
+ if (deriveColumnWidthByType) {
+ colWidths = columnWidthsByType(columns, maxColumnWidth, nullColumn, null);
+ } else {
+ final List<Row> rows = new ArrayList<>();
+ final List<String[]> content = new ArrayList<>();
+ content.add(columnNames);
+ while (it.hasNext()) {
+ Row row = it.next();
+ rows.add(row);
+ content.add(rowToString(row, nullColumn));
+ }
+ colWidths = columnWidthsByContent(columnNames, content, maxColumnWidth);
+ it = rows.iterator();
}
- int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth);
- String borderline = genBorderLine(colWidths);
-
- // print field names
+ final String borderline = genBorderLine(colWidths);
+ // print border line
printWriter.println(borderline);
- printSingleRow(colWidths, rows.get(0), printWriter);
+ // print field names
+ PrintUtils.printSingleRow(colWidths, columnNames, printWriter);
+ // print border line
printWriter.println(borderline);
+ printWriter.flush();
+
+ long numRows = 0;
+ while (it.hasNext()) {
+ String[] cols = rowToString(it.next(), nullColumn);
+
+ // print content
+ printSingleRow(colWidths, cols, printWriter);
+ numRows++;
+ }
- // print content
- if (rows.size() > 1) {
- rows.subList(1, rows.size()).forEach(row -> printSingleRow(colWidths, row, printWriter));
+ if (numRows > 0) {
+ // print border line
printWriter.println(borderline);
}
- int numRows = rows.size() - 1;
final String rowTerm;
if (numRows > 1) {
rowTerm = "rows";
} else {
rowTerm = "row";
}
- printWriter.println((rows.size() - 1) + " " + rowTerm + " in set");
+ printWriter.println(numRows + " " + rowTerm + " in set");
printWriter.flush();
}
@@ -141,11 +182,11 @@ public class PrintUtils {
}
private static int[] columnWidthsByContent(
- List<TableColumn> columns,
+ String[] columnNames,
List<String[]> rows,
int maxColumnWidth) {
// fill width with field names first
- int[] colWidths = columns.stream().mapToInt(col -> col.getName().length()).toArray();
+ final int[] colWidths = Stream.of(columnNames).mapToInt(String::length).toArray();
// fill column width with real data
for (String[] row : rows) {
@@ -172,6 +213,104 @@ public class PrintUtils {
return sb.toString();
}
+ /**
+ * Try to derive column width based on column types.
+ * If result set is not small enough to be stored in java heap memory,
+ * we can't determine column widths based on column values.
+ */
+ public static int[] columnWidthsByType(
+ List<TableColumn> columns,
+ int maxColumnWidth,
+ String nullColumn,
+ @Nullable String rowKindColumn) {
+ // fill width with field names first
+ final int[] colWidths = columns.stream()
+ .mapToInt(col -> col.getName().length())
+ .toArray();
+
+ // determine proper column width based on types
+ for (int i = 0; i < columns.size(); ++i) {
+ LogicalType type = columns.get(i).getType().getLogicalType();
+ int len;
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ len = TinyIntType.PRECISION + 1; // extra for negative value
+ break;
+ case SMALLINT:
+ len = SmallIntType.PRECISION + 1; // extra for negative value
+ break;
+ case INTEGER:
+ len = IntType.PRECISION + 1; // extra for negative value
+ break;
+ case BIGINT:
+ len = BigIntType.PRECISION + 1; // extra for negative value
+ break;
+ case DECIMAL:
+ len = ((DecimalType) type).getPrecision() + 2; // extra for negative value and decimal point
+ break;
+ case BOOLEAN:
+ len = 5; // "true" or "false"
+ break;
+ case DATE:
+ len = 10; // e.g. 9999-12-31
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ int precision = ((TimeType) type).getPrecision();
+ len = precision == 0 ? 8 : precision + 9; // 23:59:59[.999999999]
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ precision = ((TimestampType) type).getPrecision();
+ len = timestampTypeColumnWidth(precision);
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ precision = ((LocalZonedTimestampType) type).getPrecision();
+ len = timestampTypeColumnWidth(precision);
+ break;
+ default:
+ len = maxColumnWidth;
+ }
+
+ // adjust column width with potential null values
+ colWidths[i] = Math.max(colWidths[i], Math.max(len, nullColumn.length()));
+ }
+
+ // add an extra column for row kind if necessary
+ if (rowKindColumn != null) {
+ final int[] ret = new int[columns.size() + 1];
+ ret[0] = rowKindColumn.length();
+ System.arraycopy(colWidths, 0, ret, 1, columns.size());
+ return ret;
+ } else {
+ return colWidths;
+ }
+ }
+
+ /**
+ * Here we consider two popular class for timestamp: LocalDateTime and java.sql.Timestamp.
+ *
+ * <p>According to LocalDateTime's comment, the string output will be one of the following
+ * ISO-8601 formats:
+ * <li>{@code uuuu-MM-dd'T'HH:mm:ss}</li>
+ * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSS}</li>
+ * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSS}</li>
+ * <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS}</li>
+ *
+ * <p>And for java.sql.Timestamp, the number of digits after point will be precision except
+ * when precision is 0. In that case, the format would be 'uuuu-MM-dd HH:mm:ss.0'
+ */
+ private static int timestampTypeColumnWidth(int precision) {
+ int base = 19; // length of uuuu-MM-dd HH:mm:ss
+ if (precision == 0) {
+ return base + 2; // consider java.sql.Timestamp
+ } else if (precision <= 3) {
+ return base + 4;
+ } else if (precision <= 6) {
+ return base + 7;
+ } else {
+ return base + 10;
+ }
+ }
+
public static void printSingleRow(int[] colWidths, String[] cols, PrintWriter printWriter) {
StringBuilder sb = new StringBuilder();
sb.append("|");
@@ -202,7 +341,7 @@ public class PrintUtils {
} else {
passedWidth += 1;
}
- if (passedWidth >= targetWidth) {
+ if (passedWidth > targetWidth) {
break;
}
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
index 47fd62a..cd22458 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
@@ -84,9 +84,28 @@ public class PrintUtilsTest {
new PrintWriter(outContent));
assertEquals(
- "+---------+-----+--------+---------+----------------+-----------+\n" +
- "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" +
- "+---------+-----+--------+---------+----------------+-----------+\n" +
+ "+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+ "+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+ "0 row in set" + System.lineSeparator(),
+ outContent.toString());
+ }
+
+ @Test
+ public void testPrintWithEmptyResultAndDeriveColumnWidthByContent() {
+ PrintUtils.printAsTableauForm(
+ getSchema(),
+ Collections.<Row>emptyList().iterator(),
+ new PrintWriter(outContent),
+ PrintUtils.MAX_COLUMN_WIDTH,
+ "",
+ false // derive column width by content
+ );
+
+ assertEquals(
+ "+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+ "+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
"0 row in set" + System.lineSeparator(),
outContent.toString());
}
@@ -101,23 +120,72 @@ public class PrintUtilsTest {
// note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean) character's
// width < 2 in IDE by default, every CJK character usually's width is 2, you can open this source file
// by vim or just cat the file to check the regular result.
+ // The last row of `varchar` value will pad with two ' ' before the column.
+ // Because the length of `これは日本語をテストするた` plus the length of `...` is 29,
+ // no more Japanese character can be added to the line.
assertEquals(
- "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
- "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" +
- "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
- "| (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" +
- "| false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" +
- "| true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n" +
- "| false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |\n" +
- "| true | 100 | -9223372036854775808 | abcdefg111 | (NULL) | 2020-03-01 18:39:14.123456 |\n" +
- "| (NULL) | -1 | -1 | abcdefghijklmnopqrstuvwxyz | -12345.06789 | (NULL) |\n" +
- "| (NULL) | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.0 |\n" +
- "| (NULL) | -1 | -1 | これは日本語をテストするた... | -12345.06789 | 2020-03-04 18:39:14.0 |\n" +
- "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+ "| (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+ "| false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+ "| true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+ "| false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |" + System.lineSeparator() +
+ "| true | 100 | -9223372036854775808 | abcdefg111 | (NULL) | 2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
+ "| (NULL) | -1 | -1 | abcdefghijklmnopqrstuvwxyza... | -12345.06789 | (NULL) |" + System.lineSeparator() +
+ "| (NULL) | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.0 |" + System.lineSeparator() +
+ "| (NULL) | -1 | -1 | これは日本語をテストするた... | -12345.06789 | 2020-03-04 18:39:14.0 |" + System.lineSeparator() +
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
"8 rows in set" + System.lineSeparator(),
outContent.toString());
}
+ @Test
+ public void testPrintWithMultipleRowsAndDeriveColumnWidthByContent() {
+ PrintUtils.printAsTableauForm(
+ getSchema(),
+ getData().subList(0, 3).iterator(),
+ new PrintWriter(outContent),
+ PrintUtils.MAX_COLUMN_WIDTH,
+ "",
+ false // derive column width by content
+ );
+
+ assertEquals(
+ "+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+ "+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+ "| | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+ "| false | | 0 | | 1 | 2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+ "| true | 2147483647 | | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+ "+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+ "3 rows in set" + System.lineSeparator(),
+ outContent.toString());
+ }
+
+ @Test
+ public void testPrintWithMultipleRowsAndDeriveColumnWidthByType() {
+ PrintUtils.printAsTableauForm(
+ getSchema(),
+ getData().subList(0, 3).iterator(),
+ new PrintWriter(outContent),
+ PrintUtils.MAX_COLUMN_WIDTH,
+ "",
+ true // derive column width by type
+ );
+
+ assertEquals(
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+ "| | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+ "| false | | 0 | | 1 | 2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+ "| true | 2147483647 | | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+ "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+ "3 rows in set" + System.lineSeparator(),
+ outContent.toString());
+ }
+
private TableSchema getSchema() {
return TableSchema.builder()
.field("boolean", DataTypes.BOOLEAN())
@@ -175,7 +243,7 @@ public class PrintUtilsTest {
null,
-1,
-1,
- "abcdefghijklmnopqrstuvwxyz",
+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz",
BigDecimal.valueOf(-12345.06789),
null)
);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index db10922..9b830f5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -44,10 +44,12 @@ import org.apache.flink.table.types.{AbstractDataType, DataType}
import org.apache.flink.table.util.JavaScalaConversionUtil
import org.apache.flink.table.utils.PrintUtils
import org.apache.flink.types.Row
+
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.FrameworkConfig
import org.apache.commons.lang3.StringUtils
+
import _root_.java.lang.{Iterable => JIterable, Long => JLong}
import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
@@ -634,7 +636,8 @@ abstract class TableEnvImpl(
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(tableSchema)
.data(tableSink.getResultIterator)
- .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
+ .setPrintStyle(
+ PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, true))
.build
} catch {
case e: Exception =>