You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2023/12/07 07:44:50 UTC
(flink) branch master updated: [FLINK-33726][sql-client] print time cost for streaming queries
This is an automated email from the ASF dual-hosted git repository.
jingge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb5b588e4d [FLINK-33726][sql-client] print time cost for streaming queries
4eb5b588e4d is described below
commit 4eb5b588e4d1230c1b038d632b7ed2a3676db32c
Author: jingge <ge...@gmail.com>
AuthorDate: Mon Dec 4 16:15:45 2023 +0100
[FLINK-33726][sql-client] print time cost for streaming queries
---
.../table/client/cli/CliTableauResultView.java | 73 +++++++---
.../table/client/cli/CliTableauResultViewTest.java | 147 +++++++++++++++++++++
.../flink-sql-client/src/test/resources/sql/set.q | 4 +
3 files changed, 203 insertions(+), 21 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
index edf9c0c8f99..04305e7a781 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
@@ -95,13 +95,7 @@ public class CliTableauResultView implements AutoCloseable {
resultFuture.get();
cleanUpQuery = false; // job finished successfully
} catch (CancellationException e) {
- terminal.writer()
- .println(
- "Query terminated, received a total of "
- + receivedRowCount.get()
- + " "
- + getRowTerm(receivedRowCount));
- terminal.flush();
+ printTerminatedFooter(receivedRowCount);
} catch (ExecutionException e) {
if (e.getCause() instanceof SqlExecutionException) {
throw (SqlExecutionException) e.getCause();
@@ -114,6 +108,27 @@ public class CliTableauResultView implements AutoCloseable {
}
}
+ private void printTerminatedFooter(AtomicInteger receivedRowCount) {
+ if (!resultDescriptor.isPrintQueryTimeCost()) {
+ terminal.writer()
+ .println(
+ "Query terminated, received a total of "
+ + receivedRowCount.get()
+ + " "
+ + getRowTerm(receivedRowCount));
+ } else {
+ terminal.writer()
+ .println(
+ "Query terminated, received a total of "
+ + receivedRowCount.get()
+ + " "
+ + getRowTerm(receivedRowCount)
+ + calculateTimeCostInPrintFormat(
+ queryBeginTime, System.currentTimeMillis()));
+ }
+ terminal.flush();
+ }
+
@Override
public void close() {
this.displayResultExecutorService.shutdown();
@@ -188,7 +203,7 @@ public class CliTableauResultView implements AutoCloseable {
calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
terminal.writer().println(numRows + " " + rowTerm + " in set" + timeCost);
}
- terminal.writer().flush();
+ terminal.flush();
}
private void printStreamingResults(AtomicInteger receivedRowCount) {
@@ -200,11 +215,7 @@ public class CliTableauResultView implements AutoCloseable {
false,
true);
- // print filed names
- style.printBorderLine(terminal.writer());
- style.printColumnNamesTableauRow(terminal.writer());
- style.printBorderLine(terminal.writer());
- terminal.flush();
+ printStreamingTableHeader(style);
while (true) {
final TypedResult<List<RowData>> result = collectResult.retrieveChanges();
@@ -223,14 +234,7 @@ public class CliTableauResultView implements AutoCloseable {
if (receivedRowCount.get() > 0) {
style.printBorderLine(terminal.writer());
}
- String rowTerm = getRowTerm(receivedRowCount);
- terminal.writer()
- .println(
- "Received a total of "
- + receivedRowCount.get()
- + " "
- + rowTerm);
- terminal.flush();
+ printStreamingFooter(receivedRowCount);
return;
case PAYLOAD:
List<RowData> changes = result.getPayload();
@@ -248,6 +252,33 @@ public class CliTableauResultView implements AutoCloseable {
}
}
+ private void printStreamingTableHeader(TableauStyle style) {
+ // print filed names
+ style.printBorderLine(terminal.writer());
+ style.printColumnNamesTableauRow(terminal.writer());
+ style.printBorderLine(terminal.writer());
+ terminal.flush();
+ }
+
+ private void printStreamingFooter(AtomicInteger receivedRowCount) {
+ String rowTerm = getRowTerm(receivedRowCount);
+ if (!resultDescriptor.isPrintQueryTimeCost()) {
+ terminal.writer()
+ .println("Received a total of " + receivedRowCount.get() + " " + rowTerm);
+ } else {
+ String timeCost =
+ calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
+ terminal.writer()
+ .println(
+ "Received a total of "
+ + receivedRowCount.get()
+ + " "
+ + rowTerm
+ + timeCost);
+ }
+ terminal.flush();
+ }
+
private List<RowData> waitBatchResults() {
List<RowData> resultRows = new ArrayList<>();
do {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
index e4922ae4cdf..103439c5853 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
@@ -468,6 +468,62 @@ class CliTableauResultViewTest {
assertThat(collectResult.closed).isFalse();
}
+ @Test
+ void testStreamingResultWithDisplayingTimeCost() {
+ final Configuration testConfig = new Configuration();
+ testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+ testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+ ResultDescriptor resultDescriptor =
+ new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+ TestChangelogResult collectResult = createNewTestChangelogResult();
+ CliTableauResultView view =
+ new CliTableauResultView(
+ terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+ view.displayResults();
+
+ view.close();
+ // note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean)
+ // character's
+ // width < 2 in IDE by default, every CJK character usually's width is 2, you can open this
+ // source file
+ // by vim or just cat the file to check the regular result.
+ assertThat(terminalOutput.toString())
+ .contains(
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ + System.lineSeparator()
+ + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "| +I | <NULL> | 1 | 2 | abc | 1.23000 | 2020-03-01 18:39:14.000000 | x'32333485365d737e' |"
+ + System.lineSeparator()
+ + "| -U | FALSE | <NULL> | 0 | | 1.00000 | 2020-03-01 18:39:14.100000 | x'649e207983' |"
+ + System.lineSeparator()
+ + "| +U | TRUE | 2147483647 | <NULL> | abcdefg | 12345.00000 | 2020-03-01 18:39:14.120000 | x'92e90102' |"
+ + System.lineSeparator()
+ + "| -D | FALSE | -2147483648 | 9223372036854775807 | <NULL> | 12345.06789 | 2020-03-01 18:39:14.123000 | x'32333485365d737e' |"
+ + System.lineSeparator()
+ + "| +I | TRUE | 100 | -9223372036854775808 | abcdefg111 | <NULL> | 2020-03-01 18:39:14.123456 | x'6e17fffe' |"
+ + System.lineSeparator()
+ + "| -D | <NULL> | -1 | -1 | abcdefghijklmnopqrstuvwxyz1... | -12345.06789 | <NULL> | <NULL> |"
+ + System.lineSeparator()
+ + "| +I | <NULL> | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.000000 | x'fdfeff00010203' |"
+ + System.lineSeparator()
+ + "| -D | <NULL> | -1 | -1 | これは日本語をテストするた... | -12345.06789 | 2020-03-04 18:39:14.000000 | x'fdfeff00010203' |"
+ + System.lineSeparator()
+ + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "Received a total of 8 rows");
+
+ String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+ assertThat(outputLines[outputLines.length - 1])
+ .matches("Received a total of 8 rows \\(\\d+\\.\\d{2} seconds\\)");
+
+ // Job is finished. Don't need to close the job manually.
+ assertThat(collectResult.closed).isFalse();
+ }
+
@Test
void testEmptyStreamingResult() {
final Configuration testConfig = new Configuration();
@@ -501,6 +557,41 @@ class CliTableauResultViewTest {
assertThat(collectResult.closed).isFalse();
}
+ @Test
+ void testEmptyStreamingResultWithDisplayingTimeCost() {
+ final Configuration testConfig = new Configuration();
+ testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+ testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+ ResultDescriptor resultDescriptor =
+ new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+ TestChangelogResult collectResult = new TestChangelogResult(TypedResult::endOfStream);
+
+ CliTableauResultView view =
+ new CliTableauResultView(
+ terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+
+ view.displayResults();
+ view.close();
+
+ assertThat(terminalOutput.toString())
+ .contains(
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ + System.lineSeparator()
+ + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "Received a total of 0 row");
+
+ String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+ assertThat(outputLines[outputLines.length - 1])
+ .matches("Received a total of 0 row \\(\\d+\\.\\d{2} seconds\\)");
+
+ // Job is finished. Don't need to close the job manually.
+ assertThat(collectResult.closed).isFalse();
+ }
+
@Test
void testCancelStreamingResult() throws Exception {
final Configuration testConfig = new Configuration();
@@ -554,6 +645,62 @@ class CliTableauResultViewTest {
assertThat(collectResult.closed).isTrue();
}
+ @Test
+ void testCancelStreamingResultWithDisplayingTimeCost() throws Exception {
+ final Configuration testConfig = new Configuration();
+ testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+ testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+ TestChangelogResult collectResult =
+ new TestChangelogResult(
+ () ->
+ TypedResult.payload(
+ streamingData.subList(0, streamingData.size() / 2)),
+ TypedResult::empty);
+ ResultDescriptor resultDescriptor =
+ new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+ CliTableauResultView view =
+ new CliTableauResultView(
+ terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+
+ // submit result display in another thread
+ Future<?> furture = EXECUTOR_RESOURCE.getExecutor().submit(view::displayResults);
+
+ // wait until we processed first result
+ CommonTestUtils.waitUntilCondition(() -> collectResult.fetchCount.get() > 1, 50L);
+
+ // send signal to cancel
+ terminal.raise(Terminal.Signal.INT);
+ furture.get(10, TimeUnit.SECONDS);
+ view.close();
+
+ assertThat(terminalOutput.toString())
+ .contains(
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ + System.lineSeparator()
+ + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ + System.lineSeparator()
+ + "| +I | <NULL> | 1 | 2 | abc | 1.23000 | 2020-03-01 18:39:14.000000 | x'32333485365d737e' |"
+ + System.lineSeparator()
+ + "| -U | FALSE | <NULL> | 0 | | 1.00000 | 2020-03-01 18:39:14.100000 | x'649e207983' |"
+ + System.lineSeparator()
+ + "| +U | TRUE | 2147483647 | <NULL> | abcdefg | 12345.00000 | 2020-03-01 18:39:14.120000 | x'92e90102' |"
+ + System.lineSeparator()
+ + "| -D | FALSE | -2147483648 | 9223372036854775807 | <NULL> | 12345.06789 | 2020-03-01 18:39:14.123000 | x'32333485365d737e' |"
+ + System.lineSeparator()
+ + "Query terminated, received a total of 4 rows");
+
+ String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+ assertThat(outputLines[outputLines.length - 1])
+ .matches(
+ "Query terminated, received a total of 4 rows \\(\\d+\\.\\d{2} seconds\\)");
+
+ // close job manually
+ assertThat(collectResult.closed).isTrue();
+ }
+
@Test
void testFailedStreamingResult() {
final Configuration testConfig = new Configuration();
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index d55dfd8b1ab..25651f1adbd 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -232,6 +232,10 @@ SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
!info
+SET 'sql-client.display.print-time-cost' = 'false';
+[INFO] Execute statement succeed.
+!info
+
create function func1 as 'LowerUDF' LANGUAGE JAVA;
[INFO] Execute statement succeed.
!info