You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "keith-turner (via GitHub)" <gi...@apache.org> on 2023/08/29 15:14:26 UTC

[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1308990416


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();
+
+        // If somethingFailed is true then the batch writer will throw an exception on close or
+        // flush, so no need to close this session. Only want to close the session for retryable
+        // exceptions.
+        while (!somethingFailed) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It does not need to be the
+          // exact tserver instance that existed when the session was created because if a new
+          // tserver instance comes up then the session will not exist there. Trying to get the
+          // exact tserver instance that created the session would require changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid);
+            retry.logCompletion(log, "Closed failed write session " + location + " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer than the timeout
+          if ((System.currentTimeMillis() - startTime) > timeout) {

Review Comment:
   The rest of the class is using System.currentTimeMillis(), so I chose to use it here for consistency.  If a change were to be made to use nano time it should be done for the whole class.  That could be a follow on issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org