You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/23 15:03:26 UTC

[flink] branch release-1.11 updated: [FLINK-18399][table-api-java] Fix TableResult#print can not print the result of unbounded stream query

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 8d9f6df  [FLINK-18399][table-api-java] Fix TableResult#print can not print the result of unbounded stream query
8d9f6df is described below

commit 8d9f6df60d4c01d3410c763ff3508d50f78658ec
Author: godfrey he <go...@163.com>
AuthorDate: Tue Jun 23 23:02:11 2020 +0800

    [FLINK-18399][table-api-java] Fix TableResult#print can not print the result of unbounded stream query
    
    This closes #12752
---
 flink-python/pyflink/table/table_result.py         |   6 +-
 .../table/client/cli/CliTableauResultView.java     | 109 +-----------
 .../table/client/cli/CliTableauResultViewTest.java |   2 +-
 .../org/apache/flink/table/api/TableResult.java    |  19 ++-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/api/internal/TableResultImpl.java  |  35 +++-
 .../org/apache/flink/table/utils/PrintUtils.java   | 185 ++++++++++++++++++---
 .../apache/flink/table/utils/PrintUtilsTest.java   | 100 +++++++++--
 .../flink/table/api/internal/TableEnvImpl.scala    |   5 +-
 9 files changed, 305 insertions(+), 160 deletions(-)

diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
index 2dfb02f..e9f2abe 100644
--- a/flink-python/pyflink/table/table_result.py
+++ b/flink-python/pyflink/table/table_result.py
@@ -134,8 +134,10 @@ class TableResult(object):
         """
         Print the result contents as tableau form to client console.
 
-        NOTE: please make sure the result data to print should be small.
-        Because all data will be collected to local first, and then print them to console.
+        For streaming mode, this method guarantees end-to-end exactly-once record delivery
+        which requires the checkpointing mechanism to be enabled.
+        By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+        (see ExecutionCheckpointingOptions) through `TableConfig#getConfiguration()`.
 
         .. versionadded:: 1.11.0
         """
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
index dc81872..74126c7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
@@ -25,15 +25,6 @@ import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
 
@@ -54,7 +45,6 @@ import java.util.stream.Stream;
  */
 public class CliTableauResultView implements AutoCloseable {
 
-	private static final int NULL_COLUMN_WIDTH = CliStrings.NULL_COLUMN.length();
 	private static final int DEFAULT_COLUMN_WIDTH = 20;
 	private static final String CHANGEFLAG_COLUMN_NAME = "+/-";
 
@@ -183,11 +173,12 @@ public class CliTableauResultView implements AutoCloseable {
 		List<TableColumn> columns = resultDescriptor.getResultSchema().getTableColumns();
 		String[] fieldNames =
 				Stream.concat(
-						Stream.of("+/-"),
+						Stream.of(CHANGEFLAG_COLUMN_NAME),
 						columns.stream().map(TableColumn::getName)
 				).toArray(String[]::new);
 
-		int[] colWidths = columnWidthsByType(columns, true);
+		int[] colWidths = PrintUtils.columnWidthsByType(
+				columns, DEFAULT_COLUMN_WIDTH, CliStrings.NULL_COLUMN, CHANGEFLAG_COLUMN_NAME);
 		String borderline = PrintUtils.genBorderLine(colWidths);
 
 		// print filed names
@@ -227,98 +218,4 @@ public class CliTableauResultView implements AutoCloseable {
 			}
 		}
 	}
-
-	/**
-	 * Try to infer column width based on column types. In streaming case, we will have an
-	 * endless result set, thus couldn't determine column widths based on column values.
-	 */
-	private int[] columnWidthsByType(List<TableColumn> columns, boolean includeChangeflag) {
-		// fill width with field names first
-		int[] colWidths = columns.stream()
-				.mapToInt(col -> col.getName().length())
-				.toArray();
-
-		// determine proper column width based on types
-		for (int i = 0; i < columns.size(); ++i) {
-			LogicalType type = columns.get(i).getType().getLogicalType();
-			int len;
-			switch (type.getTypeRoot()) {
-				case TINYINT:
-					len = TinyIntType.PRECISION + 1; // extra for negative value
-					break;
-				case SMALLINT:
-					len = SmallIntType.PRECISION + 1; // extra for negative value
-					break;
-				case INTEGER:
-					len = IntType.PRECISION + 1; // extra for negative value
-					break;
-				case BIGINT:
-					len = BigIntType.PRECISION + 1; // extra for negative value
-					break;
-				case DECIMAL:
-					len = ((DecimalType) type).getPrecision() + 2; // extra for negative value and decimal point
-					break;
-				case BOOLEAN:
-					len = 5; // "true" or "false"
-					break;
-				case DATE:
-					len = 10; // e.g. 9999-12-31
-					break;
-				case TIME_WITHOUT_TIME_ZONE:
-					int precision = ((TimeType) type).getPrecision();
-					len = precision == 0 ? 8 : precision + 9; // 23:59:59[.999999999]
-					break;
-				case TIMESTAMP_WITHOUT_TIME_ZONE:
-					precision = ((TimestampType) type).getPrecision();
-					len = timestampTypeColumnWidth(precision);
-					break;
-				case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-					precision = ((LocalZonedTimestampType) type).getPrecision();
-					len = timestampTypeColumnWidth(precision);
-					break;
-				default:
-					len = DEFAULT_COLUMN_WIDTH;
-			}
-
-			// adjust column width with potential null values
-			colWidths[i] = Math.max(colWidths[i], Math.max(len, NULL_COLUMN_WIDTH));
-		}
-
-		// add an extra column for change flag if necessary
-		if (includeChangeflag) {
-			int[] ret = new int[columns.size() + 1];
-			ret[0] = CHANGEFLAG_COLUMN_NAME.length();
-			System.arraycopy(colWidths, 0, ret, 1, columns.size());
-			return ret;
-		} else {
-			return colWidths;
-		}
-	}
-
-	/**
-	 * Here we consider two popular class for timestamp: LocalDateTime and java.sql.Timestamp.
-	 *
-	 * <p>According to LocalDateTime's comment, the string output will be one of the following
-	 * ISO-8601 formats:
-	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss}</li>
-	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSS}</li>
-	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSS}</li>
-	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS}</li>
-	 *
-	 * <p>And for java.sql.Timestamp, the number of digits after point will be precision except
-	 * when precision is 0. In that case, the format would be 'uuuu-MM-dd HH:mm:ss.0'
-	 */
-	int timestampTypeColumnWidth(int precision) {
-		int base = 19; // length of uuuu-MM-dd HH:mm:ss
-		if (precision == 0) {
-			return base + 2; // consider java.sql.Timestamp
-		} else if (precision <= 3) {
-			return base + 4;
-		} else if (precision <= 6) {
-			return base + 7;
-		} else {
-			return base + 10;
-		}
-	}
-
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
index 0a56458..1e7d7ef 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
@@ -310,7 +310,7 @@ public class CliTableauResultViewTest {
 				"|   + |    true |  2147483647 |               (NULL) |              abcdefg |     1234567890 |     2020-03-01 18:39:14.12 |" + System.lineSeparator() +
 				"|   - |   false | -2147483648 |  9223372036854775807 |               (NULL) |    12345.06789 |    2020-03-01 18:39:14.123 |" + System.lineSeparator() +
 				"|   + |    true |         100 | -9223372036854775808 |           abcdefg111 |         (NULL) | 2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
-				"|   - |  (NULL) |          -1 |                   -1 |  abcdefghijklmnop... |   -12345.06789 |                     (NULL) |" + System.lineSeparator() +
+				"|   - |  (NULL) |          -1 |                   -1 | abcdefghijklmnopq... |   -12345.06789 |                     (NULL) |" + System.lineSeparator() +
 				"|   + |  (NULL) |          -1 |                   -1 |         这是一段中文 |   -12345.06789 |      2020-03-04 18:39:14.0 |" + System.lineSeparator() +
 				"|   - |  (NULL) |          -1 |                   -1 |  これは日本語をテ... |   -12345.06789 |      2020-03-04 18:39:14.0 |" + System.lineSeparator() +
 				"+-----+---------+-------------+----------------------+----------------------+----------------+----------------------------+" + System.lineSeparator() +
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
index f6c1a79..17b03cb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
@@ -131,14 +131,29 @@ public interface TableResult {
 	 *      it... // collect same data
 	 *  }
 	 * }</pre>
+	 *
+	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
+	 * which requires the checkpointing mechanism to be enabled.
+	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 *
+	 * <p>In order to fetch result to local, you can call either {@link #collect()} and {@link #print()}.
+	 * But, they can't be called both on the same {@link TableResult} instance,
+	 * because the result can only be accessed once.
 	 */
 	CloseableIterator<Row> collect();
 
 	/**
 	 * Print the result contents as tableau form to client console.
 	 *
-	 * <p><strong>NOTE:</strong> please make sure the result data to print should be small.
-	 * Because all data will be collected to local first, and then print them to console.
+	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
+	 * which requires the checkpointing mechanism to be enabled.
+	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
+	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 *
+	 * <p>In order to fetch result to local, you can call either {@link #collect()} and {@link #print()}.
+	 * But, they can't be called both on the same {@link TableResult} instance,
+	 * because the result can only be accessed once.
 	 */
 	void print();
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 95deb43..08409b9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -735,7 +735,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 					.tableSchema(tableSchema)
 					.data(tableSink.getResultIterator())
 					.setPrintStyle(TableResultImpl.PrintStyle.tableau(
-							PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
+							PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, true))
 					.build();
 		} catch (Exception e) {
 			throw new TableException("Failed to execute sql", e);
@@ -1121,7 +1121,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 						headers,
 						types).build())
 				.data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
-				.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
+				.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "", false))
 				.build();
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index 07cb9a0..9e2c035 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -94,8 +94,14 @@ class TableResultImpl implements TableResult {
 		if (printStyle instanceof TableauStyle) {
 			int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
 			String nullColumn = ((TableauStyle) printStyle).getNullColumn();
+			boolean deriveColumnWidthByType =  ((TableauStyle) printStyle).isDeriveColumnWidthByType();
 			PrintUtils.printAsTableauForm(
-					getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn);
+					getTableSchema(),
+					it,
+					new PrintWriter(System.out),
+					maxColumnWidth,
+					nullColumn,
+					deriveColumnWidthByType);
 		} else if (printStyle instanceof RawContentStyle) {
 			while (it.hasNext()) {
 				System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
@@ -117,7 +123,7 @@ class TableResultImpl implements TableResult {
 		private TableSchema tableSchema = null;
 		private ResultKind resultKind = null;
 		private CloseableIterator<Row> data = null;
-		private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
+		private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
 
 		private Builder() {
 		}
@@ -198,13 +204,17 @@ class TableResultImpl implements TableResult {
 	 */
 	public interface PrintStyle {
 		/**
-		 * Create a tableau print style with given max column width and null column,
+		 * Create a tableau print style with given max column width, null column,
+		 * and a flag to indicate whether the column width is derived from type (true) or content (false),
 		 * which prints the result schema and content as tableau form.
 		 */
-		static PrintStyle tableau(int maxColumnWidth, String nullColumn) {
+		static PrintStyle tableau(
+				int maxColumnWidth,
+				String nullColumn,
+				boolean deriveColumnWidthByType) {
 			Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
 			Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
-			return new TableauStyle(maxColumnWidth, nullColumn);
+			return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
 		}
 
 		/**
@@ -221,15 +231,26 @@ class TableResultImpl implements TableResult {
 	 * print the result schema and content as tableau form.
 	 */
 	private static final class TableauStyle implements PrintStyle {
-
+		/**
+		 * A flag to indicate whether the column width is derived from type (true) or content (false).
+		 */
+		private final boolean deriveColumnWidthByType;
 		private final int maxColumnWidth;
 		private final String nullColumn;
 
-		private TableauStyle(int maxColumnWidth, String nullColumn) {
+		private TableauStyle(
+				int maxColumnWidth,
+				String nullColumn,
+				boolean deriveColumnWidthByType) {
+			this.deriveColumnWidthByType = deriveColumnWidthByType;
 			this.maxColumnWidth = maxColumnWidth;
 			this.nullColumn = nullColumn;
 		}
 
+		public boolean isDeriveColumnWidthByType() {
+			return deriveColumnWidthByType;
+		}
+
 		int getMaxColumnWidth() {
 			return maxColumnWidth;
 		}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
index 7916fa1..07e147d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
@@ -21,16 +21,28 @@ package org.apache.flink.table.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import com.ibm.icu.lang.UCharacter;
 import com.ibm.icu.lang.UProperty;
 
+import javax.annotation.Nullable;
+
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Stream;
 
 /**
  * Utilities for print formatting.
@@ -65,13 +77,17 @@ public class PrintUtils {
 			TableSchema tableSchema,
 			Iterator<Row> it,
 			PrintWriter printWriter) {
-		printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN);
+		printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN, false);
 	}
 
 	/**
 	 * Displays the result in a tableau form.
 	 *
+	 * <p><b>NOTE:</b> please make sure the data to print is small enough to be stored in java heap memory
+	 * if the column width is derived from content (`deriveColumnWidthByType` is false).
+	 *
 	 * <p>For example:
+	 * <pre>
 	 * +-------------+---------+-------------+
 	 * | boolean_col | int_col | varchar_col |
 	 * +-------------+---------+-------------+
@@ -79,47 +95,72 @@ public class PrintUtils {
 	 * |       false |       2 |         def |
 	 * |      (NULL) |  (NULL) |      (NULL) |
 	 * +-------------+---------+-------------+
-	 * 3 rows in result
+	 * </pre>
 	 *
-	 * <p>Changelog is not supported until FLINK-16998 is finished.
+	 * @param tableSchema The schema of the data to print
+	 * @param it The iterator for the data to print
+	 * @param printWriter The writer to write to
+	 * @param maxColumnWidth The max width of a column
+	 * @param nullColumn The string representation of a null value
+	 * @param deriveColumnWidthByType A flag to indicate whether the column width
+	 *        is derived from type (true) or content (false).
 	 */
 	public static void printAsTableauForm(
 			TableSchema tableSchema,
 			Iterator<Row> it,
 			PrintWriter printWriter,
 			int maxColumnWidth,
-			String nullColumn) {
-		List<String[]> rows = new ArrayList<>();
+			String nullColumn,
+			boolean deriveColumnWidthByType) {
+		final List<TableColumn> columns = tableSchema.getTableColumns();
+		final String[] columnNames = columns.stream().map(TableColumn::getName).toArray(String[]::new);
 
-		// fill field names first
-		List<TableColumn> columns = tableSchema.getTableColumns();
-		rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
-		while (it.hasNext()) {
-			rows.add(rowToString(it.next(), nullColumn));
+		final int[] colWidths;
+		if (deriveColumnWidthByType) {
+			colWidths = columnWidthsByType(columns, maxColumnWidth, nullColumn, null);
+		} else {
+			final List<Row> rows = new ArrayList<>();
+			final List<String[]> content = new ArrayList<>();
+			content.add(columnNames);
+			while (it.hasNext()) {
+				Row row = it.next();
+				rows.add(row);
+				content.add(rowToString(row, nullColumn));
+			}
+			colWidths = columnWidthsByContent(columnNames, content, maxColumnWidth);
+			it = rows.iterator();
 		}
 
-		int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth);
-		String borderline = genBorderLine(colWidths);
-
-		// print field names
+		final String borderline = genBorderLine(colWidths);
+		// print border line
 		printWriter.println(borderline);
-		printSingleRow(colWidths, rows.get(0), printWriter);
+		// print field names
+		PrintUtils.printSingleRow(colWidths, columnNames, printWriter);
+		// print border line
 		printWriter.println(borderline);
+		printWriter.flush();
+
+		long numRows = 0;
+		while (it.hasNext()) {
+			String[] cols = rowToString(it.next(), nullColumn);
+
+			// print content
+			printSingleRow(colWidths, cols, printWriter);
+			numRows++;
+		}
 
-		// print content
-		if (rows.size() > 1) {
-			rows.subList(1, rows.size()).forEach(row -> printSingleRow(colWidths, row, printWriter));
+		if (numRows > 0) {
+			// print border line
 			printWriter.println(borderline);
 		}
 
-		int numRows = rows.size() - 1;
 		final String rowTerm;
 		if (numRows > 1) {
 			rowTerm = "rows";
 		} else {
 			rowTerm = "row";
 		}
-		printWriter.println((rows.size() - 1) + " " + rowTerm + " in set");
+		printWriter.println(numRows + " " + rowTerm + " in set");
 		printWriter.flush();
 	}
 
@@ -141,11 +182,11 @@ public class PrintUtils {
 	}
 
 	private static int[] columnWidthsByContent(
-			List<TableColumn> columns,
+			String[] columnNames,
 			List<String[]> rows,
 			int maxColumnWidth) {
 		// fill width with field names first
-		int[] colWidths = columns.stream().mapToInt(col -> col.getName().length()).toArray();
+		final int[] colWidths = Stream.of(columnNames).mapToInt(String::length).toArray();
 
 		// fill column width with real data
 		for (String[] row : rows) {
@@ -172,6 +213,104 @@ public class PrintUtils {
 		return sb.toString();
 	}
 
+	/**
+	 * Try to derive column width based on column types.
+	 * If result set is not small enough to be stored in java heap memory,
+	 * we can't determine column widths based on column values.
+	 */
+	public static int[] columnWidthsByType(
+			List<TableColumn> columns,
+			int maxColumnWidth,
+			String nullColumn,
+			@Nullable String rowKindColumn) {
+		// fill width with field names first
+		final int[] colWidths = columns.stream()
+				.mapToInt(col -> col.getName().length())
+				.toArray();
+
+		// determine proper column width based on types
+		for (int i = 0; i < columns.size(); ++i) {
+			LogicalType type = columns.get(i).getType().getLogicalType();
+			int len;
+			switch (type.getTypeRoot()) {
+				case TINYINT:
+					len = TinyIntType.PRECISION + 1; // extra for negative value
+					break;
+				case SMALLINT:
+					len = SmallIntType.PRECISION + 1; // extra for negative value
+					break;
+				case INTEGER:
+					len = IntType.PRECISION + 1; // extra for negative value
+					break;
+				case BIGINT:
+					len = BigIntType.PRECISION + 1; // extra for negative value
+					break;
+				case DECIMAL:
+					len = ((DecimalType) type).getPrecision() + 2; // extra for negative value and decimal point
+					break;
+				case BOOLEAN:
+					len = 5; // "true" or "false"
+					break;
+				case DATE:
+					len = 10; // e.g. 9999-12-31
+					break;
+				case TIME_WITHOUT_TIME_ZONE:
+					int precision = ((TimeType) type).getPrecision();
+					len = precision == 0 ? 8 : precision + 9; // 23:59:59[.999999999]
+					break;
+				case TIMESTAMP_WITHOUT_TIME_ZONE:
+					precision = ((TimestampType) type).getPrecision();
+					len = timestampTypeColumnWidth(precision);
+					break;
+				case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+					precision = ((LocalZonedTimestampType) type).getPrecision();
+					len = timestampTypeColumnWidth(precision);
+					break;
+				default:
+					len = maxColumnWidth;
+			}
+
+			// adjust column width with potential null values
+			colWidths[i] = Math.max(colWidths[i], Math.max(len, nullColumn.length()));
+		}
+
+		// add an extra column for row kind if necessary
+		if (rowKindColumn != null) {
+			final int[] ret = new int[columns.size() + 1];
+			ret[0] = rowKindColumn.length();
+			System.arraycopy(colWidths, 0, ret, 1, columns.size());
+			return ret;
+		} else {
+			return colWidths;
+		}
+	}
+
+	/**
+	 * Here we consider two popular class for timestamp: LocalDateTime and java.sql.Timestamp.
+	 *
+	 * <p>According to LocalDateTime's comment, the string output will be one of the following
+	 * ISO-8601 formats:
+	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss}</li>
+	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSS}</li>
+	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSS}</li>
+	 *  <li>{@code uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS}</li>
+	 *
+	 * <p>And for java.sql.Timestamp, the number of digits after point will be precision except
+	 * when precision is 0. In that case, the format would be 'uuuu-MM-dd HH:mm:ss.0'
+	 */
+	private static int timestampTypeColumnWidth(int precision) {
+		int base = 19; // length of uuuu-MM-dd HH:mm:ss
+		if (precision == 0) {
+			return base + 2; // consider java.sql.Timestamp
+		} else if (precision <= 3) {
+			return base + 4;
+		} else if (precision <= 6) {
+			return base + 7;
+		} else {
+			return base + 10;
+		}
+	}
+
 	public static void printSingleRow(int[] colWidths, String[] cols, PrintWriter printWriter) {
 		StringBuilder sb = new StringBuilder();
 		sb.append("|");
@@ -202,7 +341,7 @@ public class PrintUtils {
 			} else {
 				passedWidth += 1;
 			}
-			if (passedWidth >= targetWidth) {
+			if (passedWidth > targetWidth) {
 				break;
 			}
 		}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
index 47fd62a..cd22458 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
@@ -84,9 +84,28 @@ public class PrintUtilsTest {
 				new PrintWriter(outContent));
 
 		assertEquals(
-				"+---------+-----+--------+---------+----------------+-----------+\n" +
-				"| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" +
-				"+---------+-----+--------+---------+----------------+-----------+\n" +
+				"+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+				"| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+				"+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+				"0 row in set" + System.lineSeparator(),
+				outContent.toString());
+	}
+
+	@Test
+	public void testPrintWithEmptyResultAndDeriveColumnWidthByContent() {
+		PrintUtils.printAsTableauForm(
+				getSchema(),
+				Collections.<Row>emptyList().iterator(),
+				new PrintWriter(outContent),
+				PrintUtils.MAX_COLUMN_WIDTH,
+				"",
+				false // derive column width by content
+		);
+
+		assertEquals(
+				"+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
+				"| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" + System.lineSeparator() +
+				"+---------+-----+--------+---------+----------------+-----------+" + System.lineSeparator() +
 				"0 row in set" + System.lineSeparator(),
 				outContent.toString());
 	}
@@ -101,23 +120,72 @@ public class PrintUtilsTest {
 		// note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean) character's
 		// width < 2 in IDE by default, every CJK character usually's width is 2, you can open this source file
 		// by vim or just cat the file to check the regular result.
+		// The last row of `varchar` value will pad with two ' ' before the column.
+		// Because the length of `これは日本語をテストするた` plus the length of `...` is 29,
+		// no more Japanese character can be added to the line.
 		assertEquals(
-				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
-				"| boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |\n" +
-				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
-				"|  (NULL) |           1 |                    2 |                            abc |           1.23 |      2020-03-01 18:39:14.0 |\n" +
-				"|   false |      (NULL) |                    0 |                                |              1 |      2020-03-01 18:39:14.1 |\n" +
-				"|    true |  2147483647 |               (NULL) |                        abcdefg |     1234567890 |     2020-03-01 18:39:14.12 |\n" +
-				"|   false | -2147483648 |  9223372036854775807 |                         (NULL) |    12345.06789 |    2020-03-01 18:39:14.123 |\n" +
-				"|    true |         100 | -9223372036854775808 |                     abcdefg111 |         (NULL) | 2020-03-01 18:39:14.123456 |\n" +
-				"|  (NULL) |          -1 |                   -1 |     abcdefghijklmnopqrstuvwxyz |   -12345.06789 |                     (NULL) |\n" +
-				"|  (NULL) |          -1 |                   -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 18:39:14.0 |\n" +
-				"|  (NULL) |          -1 |                   -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 |\n" +
-				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" +
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+				"| boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |" + System.lineSeparator() +
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+				"|  (NULL) |           1 |                    2 |                            abc |           1.23 |      2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+				"|   false |      (NULL) |                    0 |                                |              1 |      2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+				"|    true |  2147483647 |               (NULL) |                        abcdefg |     1234567890 |     2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+				"|   false | -2147483648 |  9223372036854775807 |                         (NULL) |    12345.06789 |    2020-03-01 18:39:14.123 |" + System.lineSeparator() +
+				"|    true |         100 | -9223372036854775808 |                     abcdefg111 |         (NULL) | 2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
+				"|  (NULL) |          -1 |                   -1 | abcdefghijklmnopqrstuvwxyza... |   -12345.06789 |                     (NULL) |" + System.lineSeparator() +
+				"|  (NULL) |          -1 |                   -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 18:39:14.0 |" + System.lineSeparator() +
+				"|  (NULL) |          -1 |                   -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 |" + System.lineSeparator() +
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
 				"8 rows in set" + System.lineSeparator(),
 				outContent.toString());
 	}
 
+	@Test
+	public void testPrintWithMultipleRowsAndDeriveColumnWidthByContent() {
+		PrintUtils.printAsTableauForm(
+				getSchema(),
+				getData().subList(0, 3).iterator(),
+				new PrintWriter(outContent),
+				PrintUtils.MAX_COLUMN_WIDTH,
+				"",
+				false // derive column width by content
+		);
+
+		assertEquals(
+				"+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+				"| boolean |        int | bigint | varchar | decimal(10, 5) |              timestamp |" + System.lineSeparator() +
+				"+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+				"|         |          1 |      2 |     abc |           1.23 |  2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+				"|   false |            |      0 |         |              1 |  2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+				"|    true | 2147483647 |        | abcdefg |     1234567890 | 2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+				"+---------+------------+--------+---------+----------------+------------------------+" + System.lineSeparator() +
+				"3 rows in set" + System.lineSeparator(),
+				outContent.toString());
+	}
+
+	@Test
+	public void testPrintWithMultipleRowsAndDeriveColumnWidthByType() {
+		PrintUtils.printAsTableauForm(
+				getSchema(),
+				getData().subList(0, 3).iterator(),
+				new PrintWriter(outContent),
+				PrintUtils.MAX_COLUMN_WIDTH,
+				"",
+				true // derive column width by type
+		);
+
+		assertEquals(
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+				"| boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |" + System.lineSeparator() +
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+				"|         |           1 |                    2 |                            abc |           1.23 |      2020-03-01 18:39:14.0 |" + System.lineSeparator() +
+				"|   false |             |                    0 |                                |              1 |      2020-03-01 18:39:14.1 |" + System.lineSeparator() +
+				"|    true |  2147483647 |                      |                        abcdefg |     1234567890 |     2020-03-01 18:39:14.12 |" + System.lineSeparator() +
+				"+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() +
+				"3 rows in set" + System.lineSeparator(),
+				outContent.toString());
+	}
+
 	private TableSchema getSchema() {
 		return TableSchema.builder()
 				.field("boolean", DataTypes.BOOLEAN())
@@ -175,7 +243,7 @@ public class PrintUtilsTest {
 				null,
 				-1,
 				-1,
-				"abcdefghijklmnopqrstuvwxyz",
+				"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz",
 				BigDecimal.valueOf(-12345.06789),
 				null)
 		);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index db10922..9b830f5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -44,10 +44,12 @@ import org.apache.flink.table.types.{AbstractDataType, DataType}
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.table.utils.PrintUtils
 import org.apache.flink.types.Row
+
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
 import org.apache.commons.lang3.StringUtils
+
 import _root_.java.lang.{Iterable => JIterable, Long => JLong}
 import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
@@ -634,7 +636,8 @@ abstract class TableEnvImpl(
         .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .tableSchema(tableSchema)
         .data(tableSink.getResultIterator)
-        .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
+        .setPrintStyle(
+          PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, true))
         .build
     } catch {
       case e: Exception =>