You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/04 07:16:43 UTC

[GitHub] [flink] fsk119 opened a new pull request, #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

fsk119 opened a new pull request, #20149:
URL: https://github.com/apache/flink/pull/20149

   …ion in sequence
   
   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Introduce the operation lock that only one operation per Session can execute simultaneously. The main reason to introduce this feature is the `CatalogManager`, and `ResourceManager` are not thread-safe.*
   
   
   ## Brief change log
   
     - *Make the `Operation` an inner class, which is much more convenient to access OperaitonManager resources.*
     - *Introduce the lock and `Operation` should acquire the lock before doing the work.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added integration tests for executing the operation in sequence.*
     - *Added the IT tests that failed operation will release the operation lock(`testCancelAndCloseOperationInParallel` and `testFailedToSubmitOperationInParallel`).*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r920896827


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -365,12 +367,124 @@ public void testSubmitOperationAndCloseOperationManagerInParallel() throws Excep
         assertEquals(0, manager.getOperationCount());
     }
 
+    @Test
+    public void testSubmitOperationAndCloseOperationManagerInParallel2() throws Exception {
+        int count = 3;
+        CountDownLatch startRunning = new CountDownLatch(1);
+        CountDownLatch terminateRunning = new CountDownLatch(1);
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        for (int i = 0; i < count; i++) {
+            threadFactory
+                    .newThread(
+                            () ->
+                                    service.submitOperation(
+                                            sessionHandle,
+                                            OperationType.UNKNOWN,
+                                            () -> {
+                                                startRunning.countDown();
+                                                terminateRunning.await();
+                                                return null;
+                                            }))
+                    .start();
+        }
+        startRunning.await();
+        // close session should not be blocked
+        service.getSession(sessionHandle).getOperationManager().close();
+        terminateRunning.countDown();

Review Comment:
   Nit: seems this line can be removed



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import static org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
 /** Manager for the {@link Operation}. */
 public class OperationManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /** The lock that controls the visit of the {@link OperationManager}'s state. */
+    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
     private final Map<OperationHandle, Operation> submittedOperations;
     private final ExecutorService service;
+    /**
+     * Operation lock is used to control the execution among the {@link Operation}. The reason why

Review Comment:
   minor:
   ```suggestion
        * Operation lock is used to control the execution among the {@link Operation}s. The reason why
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -36,24 +38,39 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import static org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
 /** Manager for the {@link Operation}. */
 public class OperationManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /** The lock that controls the visit of the {@link OperationManager}'s state. */
+    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+
     private final Map<OperationHandle, Operation> submittedOperations;
     private final ExecutorService service;
+    /**
+     * Operation lock is used to control the execution among the {@link Operation}. The reason why
+     * using the lock to control the execution in sequence is the managers, e.g. CatalogManager are

Review Comment:
   minor: 
   ```suggestion
        * using the lock to control the execution in sequence is the managers, e.g. CatalogManager is
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 merged pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
fsk119 merged PR #20149:
URL: https://github.com/apache/flink/pull/20149


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r921088655


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -365,12 +367,124 @@ public void testSubmitOperationAndCloseOperationManagerInParallel() throws Excep
         assertEquals(0, manager.getOperationCount());
     }
 
+    @Test
+    public void testSubmitOperationAndCloseOperationManagerInParallel2() throws Exception {
+        int count = 3;
+        CountDownLatch startRunning = new CountDownLatch(1);
+        CountDownLatch terminateRunning = new CountDownLatch(1);
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+        for (int i = 0; i < count; i++) {
+            threadFactory
+                    .newThread(
+                            () ->
+                                    service.submitOperation(
+                                            sessionHandle,
+                                            OperationType.UNKNOWN,
+                                            () -> {
+                                                startRunning.countDown();
+                                                terminateRunning.await();
+                                                return null;
+                                            }))
+                    .start();
+        }
+        startRunning.await();
+        // close session should not be blocked
+        service.getSession(sessionHandle).getOperationManager().close();
+        terminateRunning.countDown();

Review Comment:
   It just clean up all used resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #20149:
URL: https://github.com/apache/flink/pull/20149#issuecomment-1174511108

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r921087978


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {
+                    LOG.debug(
+                            String.format(
+                                    "The current status is %s after updating the operation %s status to %s. Close the resources.",
+                                    current, operationHandle, OperationStatus.PENDING));
+                    closeResources();
+                }
+            } catch (Throwable t) {
+                processThrowable(t);
+                throw new SqlGatewayException(
+                        "Failed to submit the operation to the thread pool.", t);
+            } finally {
+                if (invocation == null) {
+                    // failed to submit to the thread pool and release the lock.
+                    LOG.debug(
+                            String.format(
+                                    "Operation %s releases the operation lock when failed to submit the operation to the pool.",
+                                    operationHandle));
+                    operationLock.release();

Review Comment:
   When enter to here, it means the Operation fails to submit itself to the thread pool and the copiedTask will not execute. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();

Review Comment:
   When the operation is closed or canceled by the users, the fetcher will be closed.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {

Review Comment:
   User invokes cancel/closeOperation.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
                                     CloseableIterator.adapterForIterator(rows.iterator()),
                                     rows.size());
                         });
-
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        // It means to acquire two locks if put the operation.run() and submittedOperations.put(...)
+        // in the writeLock block, which may introduce the deadlock here.
+        writeLock(() -> submittedOperations.put(handle, operation));
+        operation.run();
         return handle;
     }
 

Review Comment:
   Only one of the thread will success and others will get exception, e.g. Operation has been caceled.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting

Review Comment:
   Because I don't want others to new Operation outside except in the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20149:
URL: https://github.com/apache/flink/pull/20149#issuecomment-1173444140

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e9c81ceaa7eb6e0541086fb16af1813376148964",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e9c81ceaa7eb6e0541086fb16af1813376148964",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e9c81ceaa7eb6e0541086fb16af1813376148964 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #20149:
URL: https://github.com/apache/flink/pull/20149#issuecomment-1174967984

   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r915815057


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting

Review Comment:
   Redundant as the class is public



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
                                     CloseableIterator.adapterForIterator(rows.iterator()),
                                     rows.size());
                         });
-
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        // It means to acquire two locks if put the operation.run() and submittedOperations.put(...)

Review Comment:
   Why two locks will introduce deadlock?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {

Review Comment:
   In which case the OperationStatus will be `CLOSED` or `CANCELED`. Seems we only check it once.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
                                     CloseableIterator.adapterForIterator(rows.iterator()),
                                     rows.size());
                         });
-
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        // It means to acquire two locks if put the operation.run() and submittedOperations.put(...)
+        // in the writeLock block, which may introduce the deadlock here.
+        writeLock(() -> submittedOperations.put(handle, operation));
+        operation.run();
         return handle;
     }
 

Review Comment:
   What will happen when `Operation.cancel()` is in race?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {
+                    LOG.debug(
+                            String.format(
+                                    "The current status is %s after updating the operation %s status to %s. Close the resources.",
+                                    current, operationHandle, OperationStatus.PENDING));
+                    closeResources();
+                }
+            } catch (Throwable t) {
+                processThrowable(t);
+                throw new SqlGatewayException(
+                        "Failed to submit the operation to the thread pool.", t);
+            } finally {
+                if (invocation == null) {
+                    // failed to submit to the thread pool and release the lock.
+                    LOG.debug(
+                            String.format(
+                                    "Operation %s releases the operation lock when failed to submit the operation to the pool.",
+                                    operationHandle));
+                    operationLock.release();

Review Comment:
   The operationLock will also be released in copiedTask, which then cause release for twice.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();

Review Comment:
   When the resultFetcher will be closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org