You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 07:13:05 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #20149: [FLINK-28053][sql-gateway] Introduce operation lock to execute operat…

luoyuxia commented on code in PR #20149:
URL: https://github.com/apache/flink/pull/20149#discussion_r915815057


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting

Review Comment:
   Redundant as the class is public



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
                                     CloseableIterator.adapterForIterator(rows.iterator()),
                                     rows.size());
                         });
-
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        // It means to acquire two locks if put the operation.run() and submittedOperations.put(...)

Review Comment:
   Why two locks will introduce deadlock?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {

Review Comment:
   In which case the OperationStatus will be `CLOSED` or `CANCELED`. Seems we only check it once.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -82,12 +99,10 @@ public OperationHandle submitOperation(
                                     CloseableIterator.adapterForIterator(rows.iterator()),
                                     rows.size());
                         });
-
-        writeLock(
-                () -> {
-                    submittedOperations.put(handle, operation);
-                    operation.run(service);
-                });
+        // It means to acquire two locks if put the operation.run() and submittedOperations.put(...)
+        // in the writeLock block, which may introduce the deadlock here.
+        writeLock(() -> submittedOperations.put(handle, operation));
+        operation.run();
         return handle;
     }
 

Review Comment:
   What will happen when `Operation.cancel()` is in race?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Throwable t) {
+                                processThrowable(t);
+                            }
+                        };
+                // Please be careful: the returned future by the ExecutorService will not wrap the
+                // done method.
+                FutureTask<Void> copiedTask =
+                        new FutureTask<Void>(work, null) {
+                            @Override
+                            protected void done() {
+                                LOG.debug(
+                                        String.format(
+                                                "Release the operation lock: %s when task completes.",
+                                                operationHandle));
+                                operationLock.release();
+                            }
+                        };
+                service.submit(copiedTask);
+                invocation = copiedTask;
+                // If it is canceled or closed, terminate the invocation.
+                OperationStatus current = status.get();
+                if (current == OperationStatus.CLOSED || current == OperationStatus.CANCELED) {
+                    LOG.debug(
+                            String.format(
+                                    "The current status is %s after updating the operation %s status to %s. Close the resources.",
+                                    current, operationHandle, OperationStatus.PENDING));
+                    closeResources();
+                }
+            } catch (Throwable t) {
+                processThrowable(t);
+                throw new SqlGatewayException(
+                        "Failed to submit the operation to the thread pool.", t);
+            } finally {
+                if (invocation == null) {
+                    // failed to submit to the thread pool and release the lock.
+                    LOG.debug(
+                            String.format(
+                                    "Operation %s releases the operation lock when failed to submit the operation to the pool.",
+                                    operationHandle));
+                    operationLock.release();

Review Comment:
   The operationLock will also be released in copiedTask, which then cause release for twice.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -138,21 +153,188 @@ public ResultSet fetchResults(OperationHandle operationHandle, long token, int m
 
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
-        lock.writeLock().lock();
+        stateLock.writeLock().lock();
         try {
             isRunning = false;
             for (Operation operation : submittedOperations.values()) {
                 operation.close();
             }
             submittedOperations.clear();
         } finally {
-            lock.writeLock().unlock();
+            stateLock.writeLock().unlock();
         }
         LOG.debug("Closes the Operation Manager.");
     }
 
     // -------------------------------------------------------------------------------------------
 
+    /** Operation to manage the execution, results and so on. */
+    @VisibleForTesting
+    public class Operation {
+
+        private final OperationHandle operationHandle;
+
+        private final OperationType operationType;
+        private final boolean hasResults;
+        private final AtomicReference<OperationStatus> status;
+
+        private final Callable<ResultFetcher> resultSupplier;
+
+        private volatile Future<?> invocation;
+        private volatile ResultFetcher resultFetcher;
+        private volatile SqlExecutionException operationError;
+
+        public Operation(
+                OperationHandle operationHandle,
+                OperationType operationType,
+                Callable<ResultFetcher> resultSupplier) {
+            this.operationHandle = operationHandle;
+            this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+            this.operationType = operationType;
+            this.hasResults = true;
+            this.resultSupplier = resultSupplier;
+        }
+
+        void runBefore() {
+            updateState(OperationStatus.RUNNING);
+        }
+
+        void runAfter() {
+            updateState(OperationStatus.FINISHED);
+        }
+
+        public void run() {
+            try {
+                operationLock.acquire();
+                LOG.debug(
+                        String.format(
+                                "Operation %s acquires the operation lock.", operationHandle));
+                updateState(OperationStatus.PENDING);
+                Runnable work =
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();

Review Comment:
   When the resultFetcher will be closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org