You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/08/04 12:35:51 UTC

[flink] 01/02: [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b73437524fe9718a50feed2ce47c9e17c27ed98
Author: Shengkai <10...@qq.com>
AuthorDate: Tue Aug 2 11:56:13 2022 +0800

    [FLINK-28629][sql-gateway] Return operation schema until operation is in terminal state
---
 .../table/gateway/api/results/OperationInfo.java   |  17 ++-
 .../service/operation/OperationManager.java        |  25 ++--
 .../gateway/service/SqlGatewayServiceITCase.java   | 127 +++++++++++++--------
 3 files changed, 114 insertions(+), 55 deletions(-)

diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
index 3abe6bdb7ce..a85404f8400 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway.api.results;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.gateway.api.operation.OperationStatus;
 import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -67,11 +68,25 @@ public class OperationInfo {
             return false;
         }
         OperationInfo that = (OperationInfo) o;
-        return status == that.status && type == that.type && exception == that.exception;
+        return status == that.status
+                && type == that.type
+                && Objects.equals(exception, that.exception);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(status, type, exception);
     }
+
+    @Override
+    public String toString() {
+        return "OperationInfo{"
+                + "status="
+                + status
+                + ", type="
+                + type
+                + ", exception="
+                + (exception == null ? "null" : ExceptionUtils.stringifyException(exception))
+                + '}';
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 1e941317ce7..f493c24f868 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -157,7 +157,8 @@ public class OperationManager {
      *
      * @param operationHandle identifies the {@link Operation}.
      */
-    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) {
+    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle)
+            throws Exception {
         return getOperation(operationHandle).getResultSchema();
     }
 
@@ -210,7 +211,6 @@ public class OperationManager {
         private final OperationHandle operationHandle;
 
         private final OperationType operationType;
-        private final boolean hasResults;
         private final AtomicReference<OperationStatus> status;
 
         private final Callable<ResultFetcher> resultSupplier;
@@ -226,7 +226,6 @@ public class OperationManager {
             this.operationHandle = operationHandle;
             this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
             this.operationType = operationType;
-            this.hasResults = true;
             this.resultSupplier = resultSupplier;
         }
 
@@ -313,13 +312,21 @@ public class OperationManager {
             return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
         }
 
-        public ResolvedSchema getResultSchema() {
+        public ResolvedSchema getResultSchema() throws Exception {
+            synchronized (status) {
+                while (!status.get().isTerminalStatus()) {
+                    status.wait();
+                }
+            }
             OperationStatus current = status.get();
-            if (current != OperationStatus.FINISHED || !hasResults) {
+            if (current == OperationStatus.ERROR) {
+                throw operationError;
+            } else if (current != OperationStatus.FINISHED) {
                 throw new IllegalStateException(
-                        "The result schema is available when the Operation is in FINISHED state and the Operation indicates it has data.");
+                        String.format(
+                                "The result schema is available when the Operation is in FINISHED state but the current status is %s.",
+                                status));
             }
-
             return resultFetcher.getResultSchema();
         }
 
@@ -361,6 +368,10 @@ public class OperationManager {
                 }
             } while (!status.compareAndSet(currentStatus, toStatus));
 
+            synchronized (status) {
+                status.notifyAll();
+            }
+
             LOG.debug(
                     String.format(
                             "Convert operation %s from %s to %s.",
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 283d10b8b90..9cf33996b04 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -47,10 +47,9 @@ import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
-import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -60,10 +59,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
@@ -75,11 +75,8 @@ import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** ITCase for {@link SqlGatewayServiceImpl}. */
 public class SqlGatewayServiceITCase extends AbstractTestBase {
@@ -119,13 +116,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         SessionHandle sessionHandle = service.openSession(environment);
         Map<String, String> actualConfig = service.getSessionConfig(sessionHandle);
 
-        options.forEach(
-                (k, v) ->
-                        assertThat(
-                                String.format(
-                                        "Should contains (%s, %s) in the actual config.", k, v),
-                                actualConfig,
-                                Matchers.hasEntry(k, v)));
+        assertThat(actualConfig).containsAllEntriesOf(options);
     }
 
     @Test
@@ -150,9 +141,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 service.getSession(sessionHandle)
                         .createExecutor(new Configuration())
                         .getTableEnvironment();
-        assertEquals(catalogName, tableEnv.getCurrentCatalog());
-        assertEquals(databaseName, tableEnv.getCurrentDatabase());
-        assertTrue(new HashSet<>(Arrays.asList(tableEnv.listModules())).contains(moduleName));
+        assertThat(tableEnv.getCurrentCatalog()).isEqualTo(catalogName);
+        assertThat(tableEnv.getCurrentDatabase()).isEqualTo(databaseName);
+        assertThat(tableEnv.listModules()).contains(moduleName);
     }
 
     @Test
@@ -170,9 +161,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                ResultSet.NOT_READY_RESULTS,
-                service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE));
+        assertThat(service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
+                .isEqualTo(ResultSet.NOT_READY_RESULTS);
         endRunningLatch.countDown();
     }
 
@@ -192,9 +182,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
 
         endRunningLatch.countDown();
         OperationInfo expectedInfo =
@@ -214,10 +203,10 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             actualData.addAll(checkNotNull(currentResult.getData()));
             token = currentResult.getNextToken();
         }
-        assertEquals(expectedData, actualData);
+        assertThat(actualData).isEqualTo(expectedData);
 
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -236,17 +225,15 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         });
 
         startRunningLatch.await();
-        assertEquals(
-                new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN));
 
         service.cancelOperation(sessionHandle, operationHandle);
 
-        assertEquals(
-                new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN),
-                service.getOperationInfo(sessionHandle, operationHandle));
+        assertThat(service.getOperationInfo(sessionHandle, operationHandle))
+                .isEqualTo(new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN));
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -273,14 +260,14 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 Duration.ofSeconds(10),
                 "Failed to get expected operation status.");
 
-        Assertions.assertThatThrownBy(
+        assertThatThrownBy(
                         () ->
                                 service.fetchResults(
                                         sessionHandle, operationHandle, 0, Integer.MAX_VALUE))
                 .satisfies(anyCauseMatches(SqlExecutionException.class, msg));
 
         service.closeOperation(sessionHandle, operationHandle);
-        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+        assertThat(sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
     }
 
     @Test
@@ -304,11 +291,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             token = result.getNextToken();
         }
 
-        assertThat(
-                settings,
-                hasItem(
+        assertThat(settings)
+                .contains(
                         GenericRowData.of(
-                                StringData.fromString(key), StringData.fromString(value))));
+                                StringData.fromString(key), StringData.fromString(value)));
+    }
+
+    @Test
+    public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
+        runGetOperationSchemaUntilOperationIsReadyOrError(
+                this::getDefaultResultSet,
+                task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
     // --------------------------------------------------------------------------------------------
@@ -402,7 +395,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 "All operations should be closed.");
 
         for (OperationManager.Operation op : operations) {
-            assertEquals(OperationStatus.CLOSED, op.getOperationInfo().getStatus());
+            assertThat(op.getOperationInfo().getStatus()).isEqualTo(OperationStatus.CLOSED);
         }
     }
 
@@ -426,7 +419,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         }
         manager.close();
         latch.await();
-        assertEquals(0, manager.getOperationCount());
+        assertThat(manager.getOperationCount()).isEqualTo(0);
     }
 
     @Test
@@ -445,7 +438,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                             () -> {
                                                 startRunning.countDown();
                                                 terminateRunning.await();
-                                                return null;
+                                                return getDefaultResultSet();
                                             }))
                     .start();
         }
@@ -472,7 +465,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                 // another thread
                                 int origin = v.get();
                                 v.set(origin + 1);
-                                return null;
+                                return getDefaultResultSet();
                             }));
         }
         for (OperationHandle handle : handles) {
@@ -485,7 +478,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                     "Failed to wait operation terminate");
         }
 
-        assertEquals(threadNum, v.get());
+        assertThat(v.get()).isEqualTo(threadNum);
     }
 
     @Test
@@ -503,7 +496,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                             OperationType.UNKNOWN,
                             () -> {
                                 latch.await();
-                                return null;
+                                return getDefaultResultSet();
                             }));
         }
         // The queue is full and should reject
@@ -515,7 +508,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                         OperationType.UNKNOWN,
                                         () -> {
                                             latch.await();
-                                            return null;
+                                            return getDefaultResultSet();
                                         }))
                 .satisfies(anyCauseMatches(RejectedExecutionException.class));
         latch.countDown();
@@ -534,7 +527,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 OperationType.UNKNOWN,
                 () -> {
                     success.countDown();
-                    return null;
+                    return getDefaultResultSet();
                 });
         CommonTestUtils.waitUtil(
                 () -> success.getCount() == 0, Duration.ofSeconds(10), "Should come to end.");
@@ -587,6 +580,18 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testGetOperationSchemaWhenOperationGetError() throws Exception {
+        String msg = "Artificial Exception.";
+        runGetOperationSchemaUntilOperationIsReadyOrError(
+                () -> {
+                    throw new SqlGatewayException(msg);
+                },
+                task ->
+                        assertThatThrownBy(task::get)
+                                .satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
+    }
+
     // --------------------------------------------------------------------------------------------
 
     private OperationHandle submitDefaultOperation(
@@ -617,6 +622,34 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 data);
     }
 
+    private void runGetOperationSchemaUntilOperationIsReadyOrError(
+            Callable<ResultSet> executor,
+            ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> validator)
+            throws Exception {
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        CountDownLatch operationIsRunning = new CountDownLatch(1);
+        CountDownLatch schemaFetcherIsRunning = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                service.submitOperation(
+                        sessionHandle,
+                        OperationType.UNKNOWN,
+                        () -> {
+                            operationIsRunning.await();
+                            return executor.call();
+                        });
+        FutureTask<ResolvedSchema> task =
+                new FutureTask<>(
+                        () -> {
+                            schemaFetcherIsRunning.countDown();
+                            return service.getOperationResultSchema(sessionHandle, operationHandle);
+                        });
+        threadFactory.newThread(task).start();
+
+        schemaFetcherIsRunning.await();
+        operationIsRunning.countDown();
+        validator.accept(task);
+    }
+
     private void runCancelOrCloseOperationWhenFetchResults(
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
@@ -659,6 +692,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                 assertThatChainOfCauses(t)
                                         .anySatisfy(t1 -> condition.matches(t1.getMessage())));
 
-        assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
+        assertThat(getDefaultResultSet().getData()).containsAll(actual);
     }
 }