You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2023/12/07 07:44:50 UTC

(flink) branch master updated: [FLINK-33726][sql-client] print time cost for streaming queries

This is an automated email from the ASF dual-hosted git repository.

jingge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb5b588e4d [FLINK-33726][sql-client] print time cost for streaming queries
4eb5b588e4d is described below

commit 4eb5b588e4d1230c1b038d632b7ed2a3676db32c
Author: jingge <ge...@gmail.com>
AuthorDate: Mon Dec 4 16:15:45 2023 +0100

    [FLINK-33726][sql-client] print time cost for streaming queries
---
 .../table/client/cli/CliTableauResultView.java     |  73 +++++++---
 .../table/client/cli/CliTableauResultViewTest.java | 147 +++++++++++++++++++++
 .../flink-sql-client/src/test/resources/sql/set.q  |   4 +
 3 files changed, 203 insertions(+), 21 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
index edf9c0c8f99..04305e7a781 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
@@ -95,13 +95,7 @@ public class CliTableauResultView implements AutoCloseable {
             resultFuture.get();
             cleanUpQuery = false; // job finished successfully
         } catch (CancellationException e) {
-            terminal.writer()
-                    .println(
-                            "Query terminated, received a total of "
-                                    + receivedRowCount.get()
-                                    + " "
-                                    + getRowTerm(receivedRowCount));
-            terminal.flush();
+            printTerminatedFooter(receivedRowCount);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof SqlExecutionException) {
                 throw (SqlExecutionException) e.getCause();
@@ -114,6 +108,27 @@ public class CliTableauResultView implements AutoCloseable {
         }
     }
 
+    private void printTerminatedFooter(AtomicInteger receivedRowCount) {
+        if (!resultDescriptor.isPrintQueryTimeCost()) {
+            terminal.writer()
+                    .println(
+                            "Query terminated, received a total of "
+                                    + receivedRowCount.get()
+                                    + " "
+                                    + getRowTerm(receivedRowCount));
+        } else {
+            terminal.writer()
+                    .println(
+                            "Query terminated, received a total of "
+                                    + receivedRowCount.get()
+                                    + " "
+                                    + getRowTerm(receivedRowCount)
+                                    + calculateTimeCostInPrintFormat(
+                                            queryBeginTime, System.currentTimeMillis()));
+        }
+        terminal.flush();
+    }
+
     @Override
     public void close() {
         this.displayResultExecutorService.shutdown();
@@ -188,7 +203,7 @@ public class CliTableauResultView implements AutoCloseable {
                     calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
             terminal.writer().println(numRows + " " + rowTerm + " in set" + timeCost);
         }
-        terminal.writer().flush();
+        terminal.flush();
     }
 
     private void printStreamingResults(AtomicInteger receivedRowCount) {
@@ -200,11 +215,7 @@ public class CliTableauResultView implements AutoCloseable {
                         false,
                         true);
 
-        // print filed names
-        style.printBorderLine(terminal.writer());
-        style.printColumnNamesTableauRow(terminal.writer());
-        style.printBorderLine(terminal.writer());
-        terminal.flush();
+        printStreamingTableHeader(style);
 
         while (true) {
             final TypedResult<List<RowData>> result = collectResult.retrieveChanges();
@@ -223,14 +234,7 @@ public class CliTableauResultView implements AutoCloseable {
                     if (receivedRowCount.get() > 0) {
                         style.printBorderLine(terminal.writer());
                     }
-                    String rowTerm = getRowTerm(receivedRowCount);
-                    terminal.writer()
-                            .println(
-                                    "Received a total of "
-                                            + receivedRowCount.get()
-                                            + " "
-                                            + rowTerm);
-                    terminal.flush();
+                    printStreamingFooter(receivedRowCount);
                     return;
                 case PAYLOAD:
                     List<RowData> changes = result.getPayload();
@@ -248,6 +252,33 @@ public class CliTableauResultView implements AutoCloseable {
         }
     }
 
+    private void printStreamingTableHeader(TableauStyle style) {
+        // print filed names
+        style.printBorderLine(terminal.writer());
+        style.printColumnNamesTableauRow(terminal.writer());
+        style.printBorderLine(terminal.writer());
+        terminal.flush();
+    }
+
+    private void printStreamingFooter(AtomicInteger receivedRowCount) {
+        String rowTerm = getRowTerm(receivedRowCount);
+        if (!resultDescriptor.isPrintQueryTimeCost()) {
+            terminal.writer()
+                    .println("Received a total of " + receivedRowCount.get() + " " + rowTerm);
+        } else {
+            String timeCost =
+                    calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
+            terminal.writer()
+                    .println(
+                            "Received a total of "
+                                    + receivedRowCount.get()
+                                    + " "
+                                    + rowTerm
+                                    + timeCost);
+        }
+        terminal.flush();
+    }
+
     private List<RowData> waitBatchResults() {
         List<RowData> resultRows = new ArrayList<>();
         do {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
index e4922ae4cdf..103439c5853 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
@@ -468,6 +468,62 @@ class CliTableauResultViewTest {
         assertThat(collectResult.closed).isFalse();
     }
 
+    @Test
+    void testStreamingResultWithDisplayingTimeCost() {
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+        ResultDescriptor resultDescriptor =
+                new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+        TestChangelogResult collectResult = createNewTestChangelogResult();
+        CliTableauResultView view =
+                new CliTableauResultView(
+                        terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+        view.displayResults();
+
+        view.close();
+        // note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean)
+        // character's
+        // width < 2 in IDE by default, every CJK character usually's width is 2, you can open this
+        // source file
+        // by vim or just cat the file to check the regular result.
+        assertThat(terminalOutput.toString())
+                .contains(
+                        "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "| op | boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |                         binary |"
+                                + System.lineSeparator()
+                                + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "| +I |  <NULL> |           1 |                    2 |                            abc |        1.23000 | 2020-03-01 18:39:14.000000 |            x'32333485365d737e' |"
+                                + System.lineSeparator()
+                                + "| -U |   FALSE |      <NULL> |                    0 |                                |        1.00000 | 2020-03-01 18:39:14.100000 |                  x'649e207983' |"
+                                + System.lineSeparator()
+                                + "| +U |    TRUE |  2147483647 |               <NULL> |                        abcdefg |    12345.00000 | 2020-03-01 18:39:14.120000 |                    x'92e90102' |"
+                                + System.lineSeparator()
+                                + "| -D |   FALSE | -2147483648 |  9223372036854775807 |                         <NULL> |    12345.06789 | 2020-03-01 18:39:14.123000 |            x'32333485365d737e' |"
+                                + System.lineSeparator()
+                                + "| +I |    TRUE |         100 | -9223372036854775808 |                     abcdefg111 |         <NULL> | 2020-03-01 18:39:14.123456 |                    x'6e17fffe' |"
+                                + System.lineSeparator()
+                                + "| -D |  <NULL> |          -1 |                   -1 | abcdefghijklmnopqrstuvwxyz1... |   -12345.06789 |                     <NULL> |                         <NULL> |"
+                                + System.lineSeparator()
+                                + "| +I |  <NULL> |          -1 |                   -1 |                   这是一段中文 |   -12345.06789 | 2020-03-04 18:39:14.000000 |              x'fdfeff00010203' |"
+                                + System.lineSeparator()
+                                + "| -D |  <NULL> |          -1 |                   -1 |  これは日本語をテストするた... |   -12345.06789 | 2020-03-04 18:39:14.000000 |              x'fdfeff00010203' |"
+                                + System.lineSeparator()
+                                + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "Received a total of 8 rows");
+
+        String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+        assertThat(outputLines[outputLines.length - 1])
+                .matches("Received a total of 8 rows \\(\\d+\\.\\d{2} seconds\\)");
+
+        // Job is finished. Don't need to close the job manually.
+        assertThat(collectResult.closed).isFalse();
+    }
+
     @Test
     void testEmptyStreamingResult() {
         final Configuration testConfig = new Configuration();
@@ -501,6 +557,41 @@ class CliTableauResultViewTest {
         assertThat(collectResult.closed).isFalse();
     }
 
+    @Test
+    void testEmptyStreamingResultWithDisplayingTimeCost() {
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+        ResultDescriptor resultDescriptor =
+                new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+        TestChangelogResult collectResult = new TestChangelogResult(TypedResult::endOfStream);
+
+        CliTableauResultView view =
+                new CliTableauResultView(
+                        terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+
+        view.displayResults();
+        view.close();
+
+        assertThat(terminalOutput.toString())
+                .contains(
+                        "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "| op | boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |                         binary |"
+                                + System.lineSeparator()
+                                + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "Received a total of 0 row");
+
+        String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+        assertThat(outputLines[outputLines.length - 1])
+                .matches("Received a total of 0 row \\(\\d+\\.\\d{2} seconds\\)");
+
+        // Job is finished. Don't need to close the job manually.
+        assertThat(collectResult.closed).isFalse();
+    }
+
     @Test
     void testCancelStreamingResult() throws Exception {
         final Configuration testConfig = new Configuration();
@@ -554,6 +645,62 @@ class CliTableauResultViewTest {
         assertThat(collectResult.closed).isTrue();
     }
 
+    @Test
+    void testCancelStreamingResultWithDisplayingTimeCost() throws Exception {
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+
+        TestChangelogResult collectResult =
+                new TestChangelogResult(
+                        () ->
+                                TypedResult.payload(
+                                        streamingData.subList(0, streamingData.size() / 2)),
+                        TypedResult::empty);
+        ResultDescriptor resultDescriptor =
+                new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
+        CliTableauResultView view =
+                new CliTableauResultView(
+                        terminal, resultDescriptor, collectResult, System.currentTimeMillis());
+
+        // submit result display in another thread
+        Future<?> furture = EXECUTOR_RESOURCE.getExecutor().submit(view::displayResults);
+
+        // wait until we processed first result
+        CommonTestUtils.waitUntilCondition(() -> collectResult.fetchCount.get() > 1, 50L);
+
+        // send signal to cancel
+        terminal.raise(Terminal.Signal.INT);
+        furture.get(10, TimeUnit.SECONDS);
+        view.close();
+
+        assertThat(terminalOutput.toString())
+                .contains(
+                        "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "| op | boolean |         int |               bigint |                        varchar | decimal(10, 5) |                  timestamp |                         binary |"
+                                + System.lineSeparator()
+                                + "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+                                + System.lineSeparator()
+                                + "| +I |  <NULL> |           1 |                    2 |                            abc |        1.23000 | 2020-03-01 18:39:14.000000 |            x'32333485365d737e' |"
+                                + System.lineSeparator()
+                                + "| -U |   FALSE |      <NULL> |                    0 |                                |        1.00000 | 2020-03-01 18:39:14.100000 |                  x'649e207983' |"
+                                + System.lineSeparator()
+                                + "| +U |    TRUE |  2147483647 |               <NULL> |                        abcdefg |    12345.00000 | 2020-03-01 18:39:14.120000 |                    x'92e90102' |"
+                                + System.lineSeparator()
+                                + "| -D |   FALSE | -2147483648 |  9223372036854775807 |                         <NULL> |    12345.06789 | 2020-03-01 18:39:14.123000 |            x'32333485365d737e' |"
+                                + System.lineSeparator()
+                                + "Query terminated, received a total of 4 rows");
+
+        String[] outputLines = terminalOutput.toString().split("\\r?\\n");
+        assertThat(outputLines[outputLines.length - 1])
+                .matches(
+                        "Query terminated, received a total of 4 rows \\(\\d+\\.\\d{2} seconds\\)");
+
+        // close job manually
+        assertThat(collectResult.closed).isTrue();
+    }
+
     @Test
     void testFailedStreamingResult() {
         final Configuration testConfig = new Configuration();
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index d55dfd8b1ab..25651f1adbd 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -232,6 +232,10 @@ SET 'sql-client.execution.result-mode' = 'tableau';
 [INFO] Execute statement succeed.
 !info
 
+SET 'sql-client.display.print-time-cost' = 'false';
+[INFO] Execute statement succeed.
+!info
+
 create function func1 as 'LowerUDF' LANGUAGE JAVA;
 [INFO] Execute statement succeed.
 !info