You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/04 12:35:51 UTC
[flink] 01/02: [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b73437524fe9718a50feed2ce47c9e17c27ed98
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Aug 2 11:56:13 2022 +0800
[FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state
---
.../table/gateway/api/results/OperationInfo.java | 17 ++-
.../service/operation/OperationManager.java | 25 ++--
.../gateway/service/SqlGatewayServiceITCase.java | 127 +++++++++++++--------
3 files changed, 114 insertions(+), 55 deletions(-)
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index 3abe6bdb7ce..a85404f8400 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.api.results;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
@@ -67,11 +68,25 @@ public class OperationInfo {
return false;
}
OperationInfo that = (OperationInfo) o;
- return status == that.status && type == that.type && exception == that.exception;
+ return status == that.status
+ && type == that.type
+ && Objects.equals(exception, that.exception);
}
@Override
public int hashCode() {
return Objects.hash(status, type, exception);
}
+
+ @Override
+ public String toString() {
+ return "OperationInfo{"
+ + "status="
+ + status
+ + ", type="
+ + type
+ + ", exception="
+ + (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
+ + '}';
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 1e941317ce7..f493c24f868 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -157,7 +157,8 @@ public class OperationManager {
*
* @param operationHandle identifies the {@link Operation}.
*/
- public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) {
+ public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle)
+ throws Exception {
return getOperation(operationHandle).getResultSchema();
}
@@ -210,7 +211,6 @@ public class OperationManager {
private final OperationHandle operationHandle;
private final OperationType operationType;
- private final boolean hasResults;
private final AtomicReference<OperationStatus> status;
private final Callable<ResultFetcher> resultSupplier;
@@ -226,7 +226,6 @@ public class OperationManager {
this.operationHandle = operationHandle;
this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
this.operationType = operationType;
- this.hasResults = true;
this.resultSupplier = resultSupplier;
}
@@ -313,13 +312,21 @@ public class OperationManager {
return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
}
- public ResolvedSchema getResultSchema() {
+ public ResolvedSchema getResultSchema() throws Exception {
+ synchronized (status) {
+ while (!status.get().isTerminalStatus()) {
+ status.wait();
+ }
+ }
OperationStatus current = status.get();
- if (current != OperationStatus.FINISHED || !hasResults) {
+ if (current == OperationStatus.ERROR) {
+ throw operationError;
+ } else if (current != OperationStatus.FINISHED) {
throw new IllegalStateException(
- "The result schema is available when the Operation is in FINISHED state and the Operation indicates it has data.");
+ String.format(
+ "The result schema is available when the Operation is in FINISHED state but the current status is %s.",
+ status));
}
-
return resultFetcher.getResultSchema();
}
@@ -361,6 +368,10 @@ public class OperationManager {
}
} while (!status.compareAndSet(currentStatus, toStatus));
+ synchronized (status) {
+ status.notifyAll();
+ }
+
LOG.debug(
String.format(
"Convert operation %s from %s to %s.",
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 283d10b8b90..9cf33996b04 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -47,10 +47,9 @@ import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
-import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
-import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -60,10 +59,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
@@ -75,11 +75,8 @@ import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/** ITCase for {@link SqlGatewayServiceImpl}. */
public class SqlGatewayServiceITCase extends AbstractTestBase {
@@ -119,13 +116,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
SessionHandle sessionHandle = service.openSession(environment);
Map<String, String> actualConfig = service.getSessionConfig(sessionHandle);
- options.forEach(
- (k, v) ->
- assertThat(
- String.format(
- "Should contains (%s, %s) in the actual config.", k, v),
- actualConfig,
- Matchers.hasEntry(k, v)));
+ assertThat(actualConfig).containsAllEntriesOf(options);
}
@Test
@@ -150,9 +141,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
service.getSession(sessionHandle)
.createExecutor(new Configuration())
.getTableEnvironment();
- assertEquals(catalogName, tableEnv.getCurrentCatalog());
- assertEquals(databaseName, tableEnv.getCurrentDatabase());
- assertTrue(new HashSet<>(Arrays.asList(tableEnv.listModules())).contains(moduleName));
+ assertThat(tableEnv.getCurrentCatalog()).isEqualTo(catalogName);
+ assertThat(tableEnv.getCurrentDatabase()).isEqualTo(databaseName);
+ assertThat(tableEnv.listModules()).contains(moduleName);
}
@Test
@@ -170,9 +161,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
});
startRunningLatch.await();
- assertEquals(
- ResultSet.NOT_READY_RESULTS,
- service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE));
+ assertThat(service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
+ .isEqualTo(ResultSet.NOT_READY_RESULTS);
endRunningLatch.countDown();
}
@@ -192,9 +182,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
});
startRunningLatch.await();
- assertEquals(
- new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
- service.getOperationInfo(sessionHandle, operationHandle));
+ assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+ .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
endRunningLatch.countDown();
OperationInfo expectedInfo =
@@ -214,10 +203,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
actualData.addAll(checkNotNull(currentResult.getData()));
token = currentResult.getNextToken();
}
- assertEquals(expectedData, actualData);
+ assertThat(actualData).isEqualTo(expectedData);
service.closeOperation(sessionHandle, operationHandle);
- assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+ assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
}
@Test
@@ -236,17 +225,15 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
});
startRunningLatch.await();
- assertEquals(
- new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
- service.getOperationInfo(sessionHandle, operationHandle));
+ assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+ .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
service.cancelOperation(sessionHandle, operationHandle);
- assertEquals(
- new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN),
- service.getOperationInfo(sessionHandle, operationHandle));
+ assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+ .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
service.closeOperation(sessionHandle, operationHandle);
- assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+ assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
}
@Test
@@ -273,14 +260,14 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
Duration.ofSeconds(10),
"Failed to get expected operation status.");
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
service.fetchResults(
sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
.satisfies(anyCauseMatches(SqlExecutionException.class, msg));
service.closeOperation(sessionHandle, operationHandle);
- assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+ assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
}
@Test
@@ -304,11 +291,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
token = result.getNextToken();
}
- assertThat(
- settings,
- hasItem(
+ assertThat(settings)
+ .contains(
GenericRowData.of(
- StringData.fromString(key), StringData.fromString(value))));
+ StringData.fromString(key), StringData.fromString(value)));
+ }
+
+ @Test
+ public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
+ runGetOperationSchemaUntilOperationIsReadyOrError(
+ this::getDefaultResultSet,
+ task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
}
// --------------------------------------------------------------------------------------------
@@ -402,7 +395,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
"All operations should be closed.");
for (OperationManager.Operation op : operations) {
- assertEquals(OperationStatus.CLOSED, op.getOperationInfo().getStatus());
+ assertThat(op.getOperationInfo().getStatus()).isEqualTo(OperationStatus.CLOSED);
}
}
@@ -426,7 +419,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
}
manager.close();
latch.await();
- assertEquals(0, manager.getOperationCount());
+ assertThat(manager.getOperationCount()).isEqualTo(0);
}
@Test
@@ -445,7 +438,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
() -> {
startRunning.countDown();
terminateRunning.await();
- return null;
+ return getDefaultResultSet();
}))
.start();
}
@@ -472,7 +465,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
// another thread
int origin = v.get();
v.set(origin + 1);
- return null;
+ return getDefaultResultSet();
}));
}
for (OperationHandle handle : handles) {
@@ -485,7 +478,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
"Failed to wait operation terminate");
}
- assertEquals(threadNum, v.get());
+ assertThat(v.get()).isEqualTo(threadNum);
}
@Test
@@ -503,7 +496,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
OperationType.UNKNOWN,
() -> {
latch.await();
- return null;
+ return getDefaultResultSet();
}));
}
// The queue is full and should reject
@@ -515,7 +508,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
OperationType.UNKNOWN,
() -> {
latch.await();
- return null;
+ return getDefaultResultSet();
}))
.satisfies(anyCauseMatches(RejectedExecutionException.class));
latch.countDown();
@@ -534,7 +527,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
OperationType.UNKNOWN,
() -> {
success.countDown();
- return null;
+ return getDefaultResultSet();
});
CommonTestUtils.waitUtil(
() -> success.getCount() == 0, Duration.ofSeconds(10), "Should come to end.");
@@ -587,6 +580,18 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
}
}
+ @Test
+ public void testGetOperationSchemaWhenOperationGetError() throws Exception {
+ String msg = "Artificial Exception.";
+ runGetOperationSchemaUntilOperationIsReadyOrError(
+ () -> {
+ throw new SqlGatewayException(msg);
+ },
+ task ->
+ assertThatThrownBy(task::get)
+ .satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
+ }
+
// --------------------------------------------------------------------------------------------
private OperationHandle submitDefaultOperation(
@@ -617,6 +622,34 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
data);
}
+ private void runGetOperationSchemaUntilOperationIsReadyOrError(
+ Callable<ResultSet> executor,
+ ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> validator)
+ throws Exception {
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+ CountDownLatch operationIsRunning = new CountDownLatch(1);
+ CountDownLatch schemaFetcherIsRunning = new CountDownLatch(1);
+ OperationHandle operationHandle =
+ service.submitOperation(
+ sessionHandle,
+ OperationType.UNKNOWN,
+ () -> {
+ operationIsRunning.await();
+ return executor.call();
+ });
+ FutureTask<ResolvedSchema> task =
+ new FutureTask<>(
+ () -> {
+ schemaFetcherIsRunning.countDown();
+ return service.getOperationResultSchema(sessionHandle, operationHandle);
+ });
+ threadFactory.newThread(task).start();
+
+ schemaFetcherIsRunning.await();
+ operationIsRunning.countDown();
+ validator.accept(task);
+ }
+
private void runCancelOrCloseOperationWhenFetchResults(
SessionHandle sessionHandle,
OperationHandle operationHandle,
@@ -659,6 +692,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
assertThatChainOfCauses(t)
.anySatisfy(t1 -> condition.matches(t1.getMessage())));
- assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
+ assertThat(getDefaultResultSet().getData()).containsAll(actual);
}
}