You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/09/22 03:25:23 UTC
[flink] 02/02: [FLINK-29229][hive] Fix HiveServer2 endpoint doesn't support execute statements in sync mode
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6493e7c1c6a2eb050e5415c1748ae5c267625ea2
Author: Shengkai <10...@qq.com>
AuthorDate: Thu Sep 8 19:52:23 2022 +0800
[FLINK-29229][hive] Fix HiveServer2 endpoint doesn't support execute statements in sync mode
This closes #20790
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 79 +++++++-
.../hive/util/ThriftObjectConversions.java | 17 +-
.../endpoint/hive/HiveServer2EndpointITCase.java | 199 ++++++++++++++++++++-
.../gateway/service/SqlGatewayServiceImpl.java | 2 +-
4 files changed, 277 insertions(+), 20 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index e3037d3426b..1bc38b09dfb 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -156,6 +157,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
private static final String UNSUPPORTED_ERROR_MESSAGE =
"The HiveServer2 Endpoint currently doesn't support to %s.";
+ private static final Long CHECK_INTERVAL_MS = 100L;
// --------------------------------------------------------------------------------------------
// Server attributes
@@ -378,13 +380,9 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq tExecuteStatementReq)
throws TException {
TExecuteStatementResp resp = new TExecuteStatementResp();
+ SessionHandle sessionHandle = toSessionHandle(tExecuteStatementReq.getSessionHandle());
+ OperationHandle operationHandle = null;
try {
- if (!tExecuteStatementReq.isRunAsync()) {
- throw new UnsupportedOperationException(
- "Currently SqlGateway HiveServer2 Endpoint only supports ExecuteStatement in async mode.");
- }
-
- SessionHandle sessionHandle = toSessionHandle(tExecuteStatementReq.getSessionHandle());
String statement =
tExecuteStatementReq.isSetStatement()
? tExecuteStatementReq.getStatement()
@@ -395,20 +393,27 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
: Collections.emptyMap();
long timeout = tExecuteStatementReq.getQueryTimeout();
- OperationHandle operationHandle =
+ operationHandle =
service.executeStatement(
sessionHandle,
statement,
timeout,
Configuration.fromMap(executionConfig));
- resp.setStatus(OK_STATUS);
+ if (!tExecuteStatementReq.isRunAsync()) {
+ waitUntilOperationIsTerminated(sessionHandle, operationHandle);
+ }
+
+ resp.setStatus(OK_STATUS);
resp.setOperationHandle(
toTOperationHandle(
sessionHandle, operationHandle, TOperationType.EXECUTE_STATEMENT));
} catch (Throwable t) {
LOG.error("Failed to ExecuteStatement.", t);
resp.setStatus(toTStatus(t));
+ if (operationHandle != null) {
+ closeOperationSilently(sessionHandle, operationHandle);
+ }
}
return resp;
}
@@ -718,7 +723,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
service.fetchResults(
sessionHandle,
operationHandle,
- toFetchOrientation(tFetchResultsReq.getFetchType()),
+ toFetchOrientation(tFetchResultsReq.getOrientation()),
maxRows);
resp.setStatus(OK_STATUS);
resp.setHasMoreRows(resultSet.getResultType() != EOS);
@@ -861,6 +866,62 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin
}
}
+ /**
+ * Similar solution comparing to the {@code
+ * org.apache.hive.jdbc.HiveStatement#waitForOperationToComplete}.
+ *
+ * <p>The better solution is to introduce an interface similar to {@link TableResult#await()}.
+ */
+ private void waitUntilOperationIsTerminated(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
+ OperationInfo info;
+ do {
+ info = service.getOperationInfo(sessionHandle, operationHandle);
+ switch (info.getStatus()) {
+ case INITIALIZED:
+ case PENDING:
+ case RUNNING:
+ Thread.sleep(CHECK_INTERVAL_MS);
+ break;
+ case CANCELED:
+ case TIMEOUT:
+ throw new SqlGatewayException(
+ String.format(
+ "The operation %s's status is %s for the session %s.",
+ operationHandle, info.getStatus(), sessionHandle));
+ case ERROR:
+ throw new SqlGatewayException(
+ String.format(
+ "The operation %s's status is %s for the session %s.",
+ operationHandle, info.getStatus(), sessionHandle),
+ info.getException()
+ .orElseThrow(
+ () ->
+ new SqlGatewayException(
+ "Impossible! ERROR status should contains the error.")));
+ case FINISHED:
+ return;
+ default:
+ throw new SqlGatewayException(
+ String.format("Unknown status: %s.", info.getStatus()));
+ }
+ } while (true);
+ }
+
+ private void closeOperationSilently(
+ SessionHandle sessionHandle, OperationHandle operationHandle) {
+ try {
+ service.closeOperation(sessionHandle, operationHandle);
+ } catch (Throwable t) {
+ // ignore
+ LOG.error(
+ String.format(
+ "Close the operation %s for the session %s silently.",
+ operationHandle, sessionHandle),
+ t);
+ }
+ }
+
private String stringifyException(Throwable t) {
if (isVerbose) {
return ExceptionUtils.stringifyException(t);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 2df486bad48..35a40ea187f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -191,14 +191,15 @@ public class ThriftObjectConversions {
// Statement related conversions
// --------------------------------------------------------------------------------------------
- public static FetchOrientation toFetchOrientation(int fetchOrientation) {
- if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
- return FetchOrientation.FETCH_NEXT;
- } else if (fetchOrientation == TFetchOrientation.FETCH_PRIOR.getValue()) {
- return FetchOrientation.FETCH_PRIOR;
- } else {
- throw new UnsupportedOperationException(
- String.format("Unsupported fetch orientation: %s.", fetchOrientation));
+ public static FetchOrientation toFetchOrientation(TFetchOrientation fetchOrientation) {
+ switch (fetchOrientation) {
+ case FETCH_PRIOR:
+ return FetchOrientation.FETCH_PRIOR;
+ case FETCH_NEXT:
+ return FetchOrientation.FETCH_NEXT;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported fetch orientation: %s.", fetchOrientation));
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 5a7bd41ba0d..0f782e1e8cc 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -19,9 +19,13 @@
package org.apache.flink.table.endpoint.hive;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
-import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -34,19 +38,24 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
+import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.FutureTaskWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.serde2.thrift.Type;
import org.apache.hive.jdbc.HiveConnection;
import org.apache.hive.jdbc.HiveStatement;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -54,10 +63,17 @@ import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
@@ -75,20 +91,27 @@ import java.sql.Types;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.configuration.PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
+import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -146,7 +169,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
assertThat(actualConfig.entrySet())
.contains(
new AbstractMap.SimpleEntry<>(
- TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
+ TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()),
new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"),
new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()),
new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
@@ -572,6 +595,112 @@ public class HiveServer2EndpointITCase extends TestLogger {
}
}
+ @Test
+ public void testExecuteStatementInSyncMode() throws Exception {
+ TCLIService.Client client = createClient();
+ TSessionHandle sessionHandle = client.OpenSession(new TOpenSessionReq()).getSessionHandle();
+ TOperationHandle operationHandle =
+ client.ExecuteStatement(new TExecuteStatementReq(sessionHandle, "SHOW CATALOGS"))
+ .getOperationHandle();
+
+ assertThat(
+ client.GetOperationStatus(new TGetOperationStatusReq(operationHandle))
+ .getOperationState())
+ .isEqualTo(TOperationState.FINISHED_STATE);
+
+ RowSet rowSet =
+ RowSetFactory.create(
+ client.FetchResults(
+ new TFetchResultsReq(
+ operationHandle,
+ TFetchOrientation.FETCH_NEXT,
+ Integer.MAX_VALUE))
+ .getResults(),
+ HIVE_CLI_SERVICE_PROTOCOL_V10);
+ Iterator<Object[]> iterator = rowSet.iterator();
+ List<List<Object>> actual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ actual.add(new ArrayList<>(Arrays.asList(iterator.next())));
+ }
+ assertThat(actual).isEqualTo(Collections.singletonList(Collections.singletonList("hive")));
+ }
+
+ @Test
+ public void testExecuteStatementInSyncModeWithCompileException() throws Exception {
+ TCLIService.Client client = createClient();
+ TSessionHandle tSessionHandle =
+ client.OpenSession(new TOpenSessionReq()).getSessionHandle();
+ TExecuteStatementReq req =
+ new TExecuteStatementReq(tSessionHandle, "SELECT * FROM `non_exist_table`");
+ TExecuteStatementResp resp = client.ExecuteStatement(req);
+ assertThat(resp.getStatus().getInfoMessages())
+ .matches(
+ causes ->
+ causes.stream()
+ .anyMatch(
+ cause ->
+ cause.contains(
+ "Table not found 'non_exist_table'")));
+ assertThat(
+ ((SqlGatewayServiceImpl) (SQL_GATEWAY_SERVICE_EXTENSION.getService()))
+ .getSession(toSessionHandle(tSessionHandle))
+ .getOperationManager()
+ .getOperationCount())
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testExecuteStatementInSyncModeWithRuntimeException1() throws Exception {
+ runExecuteStatementInSyncModeWithRuntimeException(
+ (tSessionHandle, future) -> {
+ createClient().CloseSession(new TCloseSessionReq(tSessionHandle));
+
+ TExecuteStatementResp resp = future.get(10, TimeUnit.SECONDS);
+ assertThat(resp.getStatus().getInfoMessages())
+ .matches(
+ causes ->
+ causes.stream()
+ .anyMatch(
+ cause ->
+ // Close the session before or
+ // after
+ // submitting the job
+ cause.contains(
+ "Failed to execute statement.")
+ || cause.contains(
+ "Failed to getOperationInfo")));
+ });
+ }
+
+ @Test
+ public void testExecuteStatementInSyncModeWithRuntimeException2(
+ @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception {
+ runExecuteStatementInSyncModeWithRuntimeException(
+ (tSessionHandle, future) -> {
+ waitUntilJobIsRunning(restClusterClient);
+ JobID jobID =
+ JobID.fromHexString(
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .getSessionConfig(toSessionHandle(tSessionHandle))
+ .get(PIPELINE_FIXED_JOB_ID.key()));
+
+ restClusterClient.cancel(jobID).get();
+
+ TExecuteStatementResp resp = future.get(10, TimeUnit.SECONDS);
+ assertThat(resp.getStatus().getInfoMessages())
+ .matches(
+ causes ->
+ causes.stream()
+ .anyMatch(
+ cause ->
+ cause.contains(
+ String.format(
+ "Job failed (JobID: %s)",
+ jobID))));
+ });
+ }
+
// --------------------------------------------------------------------------------------------
private static void initializeEnvironment() throws Exception {
@@ -786,4 +915,70 @@ public class HiveServer2EndpointITCase extends TestLogger {
}
return actual;
}
+
+ private void runExecuteStatementInSyncModeWithRuntimeException(
+ BiConsumerWithException<
+ TSessionHandle,
+ FutureTaskWithException<TExecuteStatementResp>,
+ Exception>
+ checker)
+ throws Exception {
+ TCLIService.Client client = createClient();
+ TOpenSessionReq openSessionReq = new TOpenSessionReq();
+ openSessionReq.putToConfiguration(
+ RUNTIME_MODE.key(), RuntimeExecutionMode.STREAMING.name());
+ openSessionReq.putToConfiguration(TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name());
+ openSessionReq.putToConfiguration(PIPELINE_FIXED_JOB_ID.key(), JobID.generate().toString());
+ TSessionHandle tSessionHandle = client.OpenSession(openSessionReq).getSessionHandle();
+
+ List<String> initSql =
+ Arrays.asList(
+ "CREATE TEMPORARY TABLE source(\n"
+ + " a INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen'"
+ + ")",
+ "CREATE TEMPORARY TABLE sink(\n"
+ + " a INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'blackhole'"
+ + ")");
+
+ for (String sql : initSql) {
+ TExecuteStatementReq statementReq = new TExecuteStatementReq(tSessionHandle, sql);
+ client.ExecuteStatement(statementReq);
+ }
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ FutureTaskWithException<TExecuteStatementResp> future =
+ new FutureTaskWithException<>(
+ () -> {
+ countDownLatch.countDown();
+ // Thrift client is not thread-safe.
+ return createClient()
+ .ExecuteStatement(
+ new TExecuteStatementReq(
+ tSessionHandle,
+ "INSERT INTO sink SELECT * FROM source"));
+ });
+ Thread submitter = new Thread(future);
+ submitter.start();
+ countDownLatch.await();
+
+ checker.accept(tSessionHandle, future);
+ }
+
+ private void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+ while (getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
+ }
+
+ private List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 4bdbafe5ab7..932a0a0c852 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -312,7 +312,7 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
}
@VisibleForTesting
- Session getSession(SessionHandle sessionHandle) {
+ public Session getSession(SessionHandle sessionHandle) {
return sessionManager.getSession(sessionHandle);
}
}