You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/12 06:26:08 UTC

[flink] branch master updated: [FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1770892c5b [FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
f1770892c5b is described below

commit f1770892c5b8bce408093fc4c2fa52aafd42d7c6
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Thu Jan 12 14:25:52 2023 +0800

    [FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
    
    This closes #21582
---
 .../service/operation/OperationExecutor.java       | 53 +++++++++++++++++++-
 .../table/gateway/service/utils/Constants.java     |  6 ++-
 .../gateway/service/SqlGatewayServiceITCase.java   | 50 ++++++++++++++++++-
 .../src/main/codegen/data/Parser.tdd               |  6 ++-
 .../src/main/codegen/includes/parserImpls.ftl      | 13 +++++
 .../apache/flink/sql/parser/dql/SqlShowJobs.java   | 56 ++++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  5 ++
 .../operations/command/ShowJobsOperation.java}     | 16 +++----
 .../operations/SqlToOperationConverter.java        |  8 ++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  1 +
 10 files changed, 201 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index b57b245e9c9..bf0847da081 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -64,10 +65,12 @@ import org.apache.flink.table.operations.UseOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.ShowJobsOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AlterOperation;
 import org.apache.flink.table.operations.ddl.CreateOperation;
 import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -77,6 +80,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -89,9 +93,12 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES;
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.JOB_NAME;
 import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
+import static org.apache.flink.table.gateway.service.utils.Constants.START_TIME;
+import static org.apache.flink.table.gateway.service.utils.Constants.STATUS;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** An executor to execute the {@link Operation}. */
@@ -316,6 +323,8 @@ public class OperationExecutor {
             return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
         } else if (op instanceof StopJobOperation) {
             return callStopJobOperation(handle, (StopJobOperation) op);
+        } else if (op instanceof ShowJobsOperation) {
+            return callShowJobsOperation(handle, (ShowJobsOperation) op);
         } else {
             return callOperation(tableEnv, handle, op);
         }
@@ -524,6 +533,46 @@ public class OperationExecutor {
                         TableResultInternal.TABLE_RESULT_OK.collectInternal()));
     }
 
+    public ResultFetcher callShowJobsOperation(
+            OperationHandle operationHandle, ShowJobsOperation showJobsOperation)
+            throws SqlExecutionException {
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Collection<JobStatusMessage> jobs =
+                runClusterAction(
+                        operationHandle,
+                        clusterClient -> {
+                            try {
+                                return clusterClient
+                                        .listJobs()
+                                        .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+                            } catch (Exception e) {
+                                throw new SqlExecutionException(
+                                        "Failed to list jobs in the cluster.", e);
+                            }
+                        });
+        List<RowData> resultRows =
+                jobs.stream()
+                        .map(
+                                job ->
+                                        GenericRowData.of(
+                                                StringData.fromString(job.getJobId().toString()),
+                                                StringData.fromString(job.getJobName()),
+                                                StringData.fromString(job.getJobState().toString()),
+                                                DateTimeUtils.toTimestampData(
+                                                        job.getStartTime(), 3)))
+                        .collect(Collectors.toList());
+        return new ResultFetcher(
+                operationHandle,
+                ResolvedSchema.of(
+                        Column.physical(JOB_ID, DataTypes.STRING()),
+                        Column.physical(JOB_NAME, DataTypes.STRING()),
+                        Column.physical(STATUS, DataTypes.STRING()),
+                        Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+                resultRows);
+    }
+
     /**
      * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
      * against it.
@@ -536,7 +585,7 @@ public class OperationExecutor {
      */
     private <ClusterID, Result> Result runClusterAction(
             OperationHandle handle, ClusterAction<ClusterID, Result> clusterAction)
-            throws FlinkException {
+            throws SqlExecutionException {
         final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
         final ClusterClientFactory<ClusterID> clusterClientFactory =
                 clusterClientServiceLoader.getClusterClientFactory(configuration);
@@ -549,6 +598,8 @@ public class OperationExecutor {
                 final ClusterClient<ClusterID> clusterClient =
                         clusterDescriptor.retrieve(clusterId).getClusterClient()) {
             return clusterAction.runAction(clusterClient);
+        } catch (FlinkException e) {
+            throw new SqlExecutionException("Failed to run cluster action.", e);
         }
     }
 
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
index 53ea12c8f6c..c2ceb435fe2 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -22,9 +22,11 @@ package org.apache.flink.table.gateway.service.utils;
 public class Constants {
 
     public static final String JOB_ID = "job id";
+    public static final String JOB_NAME = "job name";
+    public static final String STATUS = "status";
+    public static final String START_TIME = "start time";
     public static final String SET_KEY = "key";
     public static final String SET_VALUE = "value";
     public static final String COMPLETION_CANDIDATES = "candidates";
-
-    public static final String SAVEPOINT_PATH = "savepoint_path";
+    public static final String SAVEPOINT_PATH = "savepoint path";
 }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index d95d7d486c3..8ca23f73071 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -121,7 +122,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
     public static final MiniClusterExtension MINI_CLUSTER =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
+                            .setNumberTaskManagers(2)
                             .build());
 
     @RegisterExtension
@@ -456,6 +457,53 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
+    @Test
+    public void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+
+        String pipelineName = "test-job";
+        configuration.setString(PipelineOptions.NAME, pipelineName);
+
+        // running jobs
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        long timeOpStart = System.currentTimeMillis();
+        OperationHandle insertsOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+        String jobId =
+                fetchAllResults(sessionHandle, insertsOperationHandle)
+                        .get(0)
+                        .getString(0)
+                        .toString();
+
+        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+        long timeOpSucceed = System.currentTimeMillis();
+
+        OperationHandle showJobsOperationHandle1 =
+                service.executeStatement(sessionHandle, "SHOW JOBS", -1, configuration);
+
+        List<RowData> result = fetchAllResults(sessionHandle, showJobsOperationHandle1);
+        RowData jobRow =
+                result.stream()
+                        .filter(row -> jobId.equals(row.getString(0).toString()))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Test job " + jobId + " not found."));
+        assertThat(jobRow.getString(1).toString()).isEqualTo(pipelineName);
+        assertThat(jobRow.getString(2).toString()).isEqualTo("RUNNING");
+        assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
+                .isBetween(timeOpStart, timeOpSucceed);
+    }
+
     // --------------------------------------------------------------------------------------------
     // Catalog API tests
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index a117fdc7da0..3509a374776 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -107,6 +107,7 @@
     "org.apache.flink.sql.parser.dql.SqlUnloadModule"
     "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
     "org.apache.flink.sql.parser.ddl.SqlStopJob"
+    "org.apache.flink.sql.parser.dql.SqlShowJobs"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -147,6 +148,7 @@
     "JAR"
     "JARS"
     "JOB"
+    "JOBS"
     "LOAD"
     "METADATA"
     "MODIFY"
@@ -294,8 +296,9 @@
     "ISOYEAR"
     "JAR"
     "JARS"
-    "JOB"
     "JAVA"
+    "JOB"
+    "JOBS"
     "JSON"
     "K"
     "KEY"
@@ -569,6 +572,7 @@
     "SqlReset()"
     "SqlAnalyzeTable()"
     "SqlStopJob()"
+    "SqlShowJobs()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index e24b39e544d..13b61a6937b 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2365,6 +2365,19 @@ SqlNode SqlAnalyzeTable():
     }
 }
 
+/**
+* Parse a "SHOW JOBS" statement.
+*/
+SqlShowJobs SqlShowJobs() :
+{
+}
+{
+    <SHOW> <JOBS>
+    {
+        return new SqlShowJobs(getPos());
+    }
+}
+
 /**
 * Parses a STOP JOB statement:
 * STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java
new file mode 100644
index 00000000000..5294564fb3a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The command to list running flink jobs in a cluster. */
+public class SqlShowJobs extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW JOBS", SqlKind.OTHER);
+
+    public SqlShowJobs(SqlParserPos pos) {
+        super(pos);
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("SHOW JOBS");
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 5a3498e6ab7..7b1878a6794 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2087,6 +2087,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                         "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
     }
 
+    @Test
+    void testShowJobs() {
+        sql("show jobs").ok("SHOW JOBS");
+    }
+
     @Test
     void testStopJob() {
         sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'");
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
similarity index 66%
copy from flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
index 53ea12c8f6c..24da96ca30d 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.service.utils;
+package org.apache.flink.table.operations.command;
 
-/** Constants used in the SqlGatewayService. */
-public class Constants {
+import org.apache.flink.table.operations.ShowOperation;
 
-    public static final String JOB_ID = "job id";
-    public static final String SET_KEY = "key";
-    public static final String SET_VALUE = "value";
-    public static final String COMPLETION_CANDIDATES = "candidates";
+/** Operation to describe a SHOW JOBS statement. */
+public class ShowJobsOperation implements ShowOperation {
 
-    public static final String SAVEPOINT_PATH = "savepoint_path";
+    @Override
+    public String asSummaryString() {
+        return "SHOW JOBS";
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 7bab1359df4..0603160cc6a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -83,6 +83,7 @@ import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase;
 import org.apache.flink.sql.parser.dql.SqlShowDatabases;
 import org.apache.flink.sql.parser.dql.SqlShowFunctions;
 import org.apache.flink.sql.parser.dql.SqlShowJars;
+import org.apache.flink.sql.parser.dql.SqlShowJobs;
 import org.apache.flink.sql.parser.dql.SqlShowModules;
 import org.apache.flink.sql.parser.dql.SqlShowPartitions;
 import org.apache.flink.sql.parser.dql.SqlShowTables;
@@ -159,6 +160,7 @@ import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.command.ShowJarsOperation;
+import org.apache.flink.table.operations.command.ShowJobsOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
@@ -358,6 +360,8 @@ public class SqlToOperationConverter {
             return Optional.of(converter.convertRemoveJar((SqlRemoveJar) validated));
         } else if (validated instanceof SqlShowJars) {
             return Optional.of(converter.convertShowJars((SqlShowJars) validated));
+        } else if (validated instanceof SqlShowJobs) {
+            return Optional.of(converter.convertShowJobs((SqlShowJobs) validated));
         } else if (validated instanceof RichSqlInsert) {
             return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
         } else if (validated instanceof SqlBeginStatementSet) {
@@ -1472,6 +1476,10 @@ public class SqlToOperationConverter {
         return new ValueLiteralExpression(value, dataType.notNull());
     }
 
+    private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
+        return new ShowJobsOperation();
+    }
+
     private Operation convertStopJob(SqlStopJob sqlStopJob) {
         return new StopJobOperation(
                 sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 84735f5f337..9a1e5996f69 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -140,6 +140,7 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowViews]
         || sqlNode.isInstanceOf[SqlShowColumns]
         || sqlNode.isInstanceOf[SqlShowPartitions]
+        || sqlNode.isInstanceOf[SqlShowJobs]
         || sqlNode.isInstanceOf[SqlRichDescribeTable]
         || sqlNode.isInstanceOf[SqlUnloadModule]
         || sqlNode.isInstanceOf[SqlUseModules]