You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/02/07 17:27:17 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #2467: Added mechanism to cancel FaTE operations from Shell with Manager up

keith-turner commented on a change in pull request #2467:
URL: https://github.com/apache/accumulo/pull/2467#discussion_r800885134



##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -300,6 +335,55 @@ public TStatus waitForCompletion(long tid) {
     return store.waitForStatusChange(tid, FINISHED_STATES);
   }
 
+  /**
+   * Attempts to cancel a running Fate transaction
+   *
+   * @param tid
+   *          transaction id
+   * @return true if transaction transitioned to a failed state or already in a completed state,
+   *         false otherwise
+   */
+  public boolean cancel(long tid) {
+    String tidStr = Long.toHexString(tid);
+    for (int retries = 0; retries < 5; retries++) {
+      if (store.tryReserve(tid)) {
+        try {
+          TStatus status = store.getStatus(tid);
+          if (status == NEW || status == SUBMITTED || status == IN_PROGRESS) {
+            store.setProperty(tid, EXCEPTION_PROP, new TApplicationException(
+                TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user"));
+            store.setStatus(tid, FAILED_IN_PROGRESS);
+            log.info(
+                "Updated status for Repo {} to FAILED_IN_PROGRESS because it was cancelled by user",
+                tidStr);
+            return true;
+          } else {
+            log.info("Repo {} cancelled by user but already in finished state", tidStr);
+            return true;
+          }
+        } finally {
+          store.unreserve(tid, 0);
+        }
+      } else {
+        // It's possible that the FateOp is being run by the TransactionRunner
+        Thread t = null;
+        synchronized (RUNNING_TRANSACTIONS) {
+          t = RUNNING_TRANSACTIONS.get(tid);
+        }
+        if (t != null) {
+          t.interrupt();

Review comment:
       There is a race condition here.  The same thread runs many different FATE tx steps.  By the time this code executes, the thread could be running a completely different FATE tx,  so it could interrupt the wrong FATE tx.

##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -71,35 +83,58 @@ public void run() {
         try {
           tid = store.reserve();
           TStatus status = store.getStatus(tid);
+          if (status == FAILED) {
+            runnerLog.info(
+                "FATE txid " + Long.toHexString(tid) + " likely cancelled before it started.");
+            return;
+          }
           Repo<T> op = store.top(tid);
-          if (status == TStatus.FAILED_IN_PROGRESS) {
+          if (status == FAILED_IN_PROGRESS) {
             processFailed(tid, op);
           } else {
             Repo<T> prevOp = null;
             try {
+              synchronized (RUNNING_TRANSACTIONS) {
+                RUNNING_TRANSACTIONS.put(tid, Thread.currentThread());
+              }
               deferTime = op.isReady(tid, environment);
 
               if (deferTime == 0) {
                 prevOp = op;
-                if (status == TStatus.SUBMITTED) {
-                  store.setStatus(tid, TStatus.IN_PROGRESS);
+                if (status == SUBMITTED) {
+                  store.setStatus(tid, IN_PROGRESS);
                 }
                 op = op.call(tid, environment);
               } else
                 continue;
 
             } catch (Exception e) {
+              if (e instanceof InterruptedException) {
+                runnerLog.info(
+                    "FATE Runner thread interrupted processing txid " + Long.toHexString(tid));
+                if (prevOp != null) {
+                  processFailed(tid, prevOp);
+                }
+                if (op != prevOp) {
+                  processFailed(tid, op);
+                }

Review comment:
       This code calls processFailed before transitioning the fate tx to FAILED_IN_PROGRESS.  If the manager process dies in the middle of processFailed() then when it comes back it will start executing the tx in the IN_PROGRESS state which could lead to normal processing, failure processing, followed by normal processing for a FATE tx.   This could lead to strange mutations to ZK and metadata table. 
   
   What is the motivation for this special case? 

##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -71,35 +83,58 @@ public void run() {
         try {
           tid = store.reserve();
           TStatus status = store.getStatus(tid);
+          if (status == FAILED) {
+            runnerLog.info(
+                "FATE txid " + Long.toHexString(tid) + " likely cancelled before it started.");
+            return;
+          }
           Repo<T> op = store.top(tid);
-          if (status == TStatus.FAILED_IN_PROGRESS) {
+          if (status == FAILED_IN_PROGRESS) {
             processFailed(tid, op);
           } else {
             Repo<T> prevOp = null;
             try {
+              synchronized (RUNNING_TRANSACTIONS) {
+                RUNNING_TRANSACTIONS.put(tid, Thread.currentThread());
+              }
               deferTime = op.isReady(tid, environment);
 
               if (deferTime == 0) {
                 prevOp = op;
-                if (status == TStatus.SUBMITTED) {
-                  store.setStatus(tid, TStatus.IN_PROGRESS);
+                if (status == SUBMITTED) {
+                  store.setStatus(tid, IN_PROGRESS);
                 }
                 op = op.call(tid, environment);
               } else
                 continue;
 
             } catch (Exception e) {
+              if (e instanceof InterruptedException) {
+                runnerLog.info(
+                    "FATE Runner thread interrupted processing txid " + Long.toHexString(tid));
+                if (prevOp != null) {
+                  processFailed(tid, prevOp);
+                }
+                if (op != prevOp) {
+                  processFailed(tid, op);
+                }
+                Thread.interrupted();

Review comment:
       Not sure, but this may prevent the following code from running.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org