You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/12 06:26:08 UTC
[flink] branch master updated: [FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1770892c5b [FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
f1770892c5b is described below
commit f1770892c5b8bce408093fc4c2fa52aafd42d7c6
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Thu Jan 12 14:25:52 2023 +0800
[FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
This closes #21582
---
.../service/operation/OperationExecutor.java | 53 +++++++++++++++++++-
.../table/gateway/service/utils/Constants.java | 6 ++-
.../gateway/service/SqlGatewayServiceITCase.java | 50 ++++++++++++++++++-
.../src/main/codegen/data/Parser.tdd | 6 ++-
.../src/main/codegen/includes/parserImpls.ftl | 13 +++++
.../apache/flink/sql/parser/dql/SqlShowJobs.java | 56 ++++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 5 ++
.../operations/command/ShowJobsOperation.java} | 16 +++----
.../operations/SqlToOperationConverter.java | 8 ++++
.../table/planner/calcite/FlinkPlannerImpl.scala | 1 +
10 files changed, 201 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index b57b245e9c9..bf0847da081 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -64,10 +65,12 @@ import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
+import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -77,6 +80,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -89,9 +93,12 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.JOB_NAME;
import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
+import static org.apache.flink.table.gateway.service.utils.Constants.START_TIME;
+import static org.apache.flink.table.gateway.service.utils.Constants.STATUS;
import static org.apache.flink.util.Preconditions.checkArgument;
/** An executor to execute the {@link Operation}. */
@@ -316,6 +323,8 @@ public class OperationExecutor {
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else if (op instanceof StopJobOperation) {
return callStopJobOperation(handle, (StopJobOperation) op);
+ } else if (op instanceof ShowJobsOperation) {
+ return callShowJobsOperation(handle, (ShowJobsOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
@@ -524,6 +533,46 @@ public class OperationExecutor {
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}
+ public ResultFetcher callShowJobsOperation(
+ OperationHandle operationHandle, ShowJobsOperation showJobsOperation)
+ throws SqlExecutionException {
+ Duration clientTimeout =
+ Configuration.fromMap(sessionContext.getConfigMap())
+ .get(ClientOptions.CLIENT_TIMEOUT);
+ Collection<JobStatusMessage> jobs =
+ runClusterAction(
+ operationHandle,
+ clusterClient -> {
+ try {
+ return clusterClient
+ .listJobs()
+ .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ "Failed to list jobs in the cluster.", e);
+ }
+ });
+ List<RowData> resultRows =
+ jobs.stream()
+ .map(
+ job ->
+ GenericRowData.of(
+ StringData.fromString(job.getJobId().toString()),
+ StringData.fromString(job.getJobName()),
+ StringData.fromString(job.getJobState().toString()),
+ DateTimeUtils.toTimestampData(
+ job.getStartTime(), 3)))
+ .collect(Collectors.toList());
+ return new ResultFetcher(
+ operationHandle,
+ ResolvedSchema.of(
+ Column.physical(JOB_ID, DataTypes.STRING()),
+ Column.physical(JOB_NAME, DataTypes.STRING()),
+ Column.physical(STATUS, DataTypes.STRING()),
+ Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ resultRows);
+ }
+
/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
@@ -536,7 +585,7 @@ public class OperationExecutor {
*/
private <ClusterID, Result> Result runClusterAction(
OperationHandle handle, ClusterAction<ClusterID, Result> clusterAction)
- throws FlinkException {
+ throws SqlExecutionException {
final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
final ClusterClientFactory<ClusterID> clusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);
@@ -549,6 +598,8 @@ public class OperationExecutor {
final ClusterClient<ClusterID> clusterClient =
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
return clusterAction.runAction(clusterClient);
+ } catch (FlinkException e) {
+ throw new SqlExecutionException("Failed to run cluster action.", e);
}
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
index 53ea12c8f6c..c2ceb435fe2 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -22,9 +22,11 @@ package org.apache.flink.table.gateway.service.utils;
public class Constants {
public static final String JOB_ID = "job id";
+ public static final String JOB_NAME = "job name";
+ public static final String STATUS = "status";
+ public static final String START_TIME = "start time";
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String COMPLETION_CANDIDATES = "candidates";
-
- public static final String SAVEPOINT_PATH = "savepoint_path";
+ public static final String SAVEPOINT_PATH = "savepoint path";
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index d95d7d486c3..8ca23f73071 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -121,7 +122,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
public static final MiniClusterExtension MINI_CLUSTER =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
+ .setNumberTaskManagers(2)
.build());
@RegisterExtension
@@ -456,6 +457,53 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
}
+ @Test
+ public void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
+ throws Exception {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+ Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+
+ String pipelineName = "test-job";
+ configuration.setString(PipelineOptions.NAME, pipelineName);
+
+ // running jobs
+ String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+ String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+ String insertSql = "INSERT INTO sink SELECT * FROM source;";
+
+ service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+ service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+ long timeOpStart = System.currentTimeMillis();
+ OperationHandle insertsOperationHandle =
+ service.executeStatement(sessionHandle, insertSql, -1, configuration);
+ String jobId =
+ fetchAllResults(sessionHandle, insertsOperationHandle)
+ .get(0)
+ .getString(0)
+ .toString();
+
+ TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+ long timeOpSucceed = System.currentTimeMillis();
+
+ OperationHandle showJobsOperationHandle1 =
+ service.executeStatement(sessionHandle, "SHOW JOBS", -1, configuration);
+
+ List<RowData> result = fetchAllResults(sessionHandle, showJobsOperationHandle1);
+ RowData jobRow =
+ result.stream()
+ .filter(row -> jobId.equals(row.getString(0).toString()))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Test job " + jobId + " not found."));
+ assertThat(jobRow.getString(1).toString()).isEqualTo(pipelineName);
+ assertThat(jobRow.getString(2).toString()).isEqualTo("RUNNING");
+ assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
+ .isBetween(timeOpStart, timeOpSucceed);
+ }
+
// --------------------------------------------------------------------------------------------
// Catalog API tests
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index a117fdc7da0..3509a374776 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -107,6 +107,7 @@
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
"org.apache.flink.sql.parser.ddl.SqlStopJob"
+ "org.apache.flink.sql.parser.dql.SqlShowJobs"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -147,6 +148,7 @@
"JAR"
"JARS"
"JOB"
+ "JOBS"
"LOAD"
"METADATA"
"MODIFY"
@@ -294,8 +296,9 @@
"ISOYEAR"
"JAR"
"JARS"
- "JOB"
"JAVA"
+ "JOB"
+ "JOBS"
"JSON"
"K"
"KEY"
@@ -569,6 +572,7 @@
"SqlReset()"
"SqlAnalyzeTable()"
"SqlStopJob()"
+ "SqlShowJobs()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index e24b39e544d..13b61a6937b 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2365,6 +2365,19 @@ SqlNode SqlAnalyzeTable():
}
}
+/**
+* Parse a "SHOW JOBS" statement.
+*/
+SqlShowJobs SqlShowJobs() :
+{
+}
+{
+ <SHOW> <JOBS>
+ {
+ return new SqlShowJobs(getPos());
+ }
+}
+
/**
* Parses a STOP JOB statement:
* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java
new file mode 100644
index 00000000000..5294564fb3a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowJobs.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The command to list running flink jobs in a cluster. */
+public class SqlShowJobs extends SqlCall {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("SHOW JOBS", SqlKind.OTHER);
+
+ public SqlShowJobs(SqlParserPos pos) {
+ super(pos);
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("SHOW JOBS");
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 5a3498e6ab7..7b1878a6794 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2087,6 +2087,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
}
+ @Test
+ void testShowJobs() {
+ sql("show jobs").ok("SHOW JOBS");
+ }
+
@Test
void testStopJob() {
sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'");
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
similarity index 66%
copy from flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
index 53ea12c8f6c..24da96ca30d 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
@@ -16,15 +16,15 @@
* limitations under the License.
*/
-package org.apache.flink.table.gateway.service.utils;
+package org.apache.flink.table.operations.command;
-/** Constants used in the SqlGatewayService. */
-public class Constants {
+import org.apache.flink.table.operations.ShowOperation;
- public static final String JOB_ID = "job id";
- public static final String SET_KEY = "key";
- public static final String SET_VALUE = "value";
- public static final String COMPLETION_CANDIDATES = "candidates";
+/** Operation to describe a SHOW JOBS statement. */
+public class ShowJobsOperation implements ShowOperation {
- public static final String SAVEPOINT_PATH = "savepoint_path";
+ @Override
+ public String asSummaryString() {
+ return "SHOW JOBS";
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 7bab1359df4..0603160cc6a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -83,6 +83,7 @@ import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase;
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
import org.apache.flink.sql.parser.dql.SqlShowFunctions;
import org.apache.flink.sql.parser.dql.SqlShowJars;
+import org.apache.flink.sql.parser.dql.SqlShowJobs;
import org.apache.flink.sql.parser.dql.SqlShowModules;
import org.apache.flink.sql.parser.dql.SqlShowPartitions;
import org.apache.flink.sql.parser.dql.SqlShowTables;
@@ -159,6 +160,7 @@ import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
+import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
@@ -358,6 +360,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertRemoveJar((SqlRemoveJar) validated));
} else if (validated instanceof SqlShowJars) {
return Optional.of(converter.convertShowJars((SqlShowJars) validated));
+ } else if (validated instanceof SqlShowJobs) {
+ return Optional.of(converter.convertShowJobs((SqlShowJobs) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlBeginStatementSet) {
@@ -1472,6 +1476,10 @@ public class SqlToOperationConverter {
return new ValueLiteralExpression(value, dataType.notNull());
}
+ private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
+ return new ShowJobsOperation();
+ }
+
private Operation convertStopJob(SqlStopJob sqlStopJob) {
return new StopJobOperation(
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 84735f5f337..9a1e5996f69 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -140,6 +140,7 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlShowColumns]
|| sqlNode.isInstanceOf[SqlShowPartitions]
+ || sqlNode.isInstanceOf[SqlShowJobs]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]