You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "keith-turner (via GitHub)" <gi...@apache.org> on 2023/08/28 21:06:30 UTC

[GitHub] [accumulo] keith-turner opened a new pull request, #3733: ensures no writes happen after batch writer closes

keith-turner opened a new pull request, #3733:
URL: https://github.com/apache/accumulo/pull/3733

   Fixes a problem with the batch writer where when retries happened that writes could possibly happen after the batch writer was closed.  Adds a test that causes writes after close without the fixes in this PR.
   
   Fixes #3721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1699935398

   > I am not going to break this into multiple commits
   
   That's fine. But, do you mind if I do it, instead of merging this as-is?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1310740506


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();

Review Comment:
   In this case its ok not to call it because the set was not changed, however I could reorder because it does not hurt to call it and can be called anywhere in the sync block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309524121


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {

Review Comment:
   I think the tight coupling is nice when analyzing the code. I like when I can examine references to a batch writer variable in an IDE and see all the functions that directly use it. Makes it easy to analyze the code for correctness w/ multithreading.  In  other parts of the Accumulo code we have layers of evolved indirection in the code that make it much more difficult to analyze, understand, and maintain.  I think this code could benefit from being made more modular, but making it more modular should be done with an overall strategy in mind.  I think a strategy of simply extracting inner classes does not evolve to any particular goal and can make the code harder to understand and maintain when iterating that strategy. Maybe a plan like separating  the RPC code from the in in memory state of the batch writer would be a better way to achieve a modular batch writer, not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309078175


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -931,6 +937,8 @@ private MutationSet sendMutationsToTabletServer(String location,
 
       timeoutTracker.startingWrite();
 
+      Long usid = null;

Review Comment:
   changed in a513d75.  Didn't see much harm or benefit in this change after making it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1308455683


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -160,8 +165,9 @@ public class TabletServerBatchWriter implements AutoCloseable {
   private final HashSet<String> serverSideErrors = new HashSet<>();
   private final FailedMutations failedMutations;
   private int unknownErrors = 0;
-  private boolean somethingFailed = false;
+  private volatile boolean somethingFailed = false;

Review Comment:
   This doesn't need to be changed in this PR, but I think there's a good case to be made to prefer the Java 5 atomic classes over manually making variables volatile.
   
   * Atomic classes force correct usage
   * Usually the extra object creation overhead doesn't matter (but obviously, don't use them when it does matter)
   * final Atomic references allow the object's contained value to be updated inside a lambda or anonymous class
   * One can immediately infer the memory characteristics by merely looking at the type of the variable in the IDE, and not needing to reference any initializing keywords, which makes for less IDE juggling back and forth, and doesn't require the developer to juggle that information in their head about which fields are declared volatile and which aren't.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();
+
+        // If somethingFailed is true then the batch writer will throw an exception on close or
+        // flush, so no need to close this session. Only want to close the session for retryable
+        // exceptions.
+        while (!somethingFailed) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It does not need to be the
+          // exact tserver instance that existed when the session was created because if a new
+          // tserver instance comes up then the session will not exist there. Trying to get the
+          // exact tserver instance that created the session would require changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid);
+            retry.logCompletion(log, "Closed failed write session " + location + " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer than the timeout
+          if ((System.currentTimeMillis() - startTime) > timeout) {

Review Comment:
   This should be using System.nanoTime() to check the timeout, which tracks the process execution duration, and should not be using the system clock, which can be changed arbitrarily by NTP, a user, another process.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {

Review Comment:
   There's a lot going on in this. This type should be in its own file. If you want it to have access to TabletServerBatchWriter internals, you can pass `this` as a parameter.



##########
test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java:
##########
@@ -52,15 +51,14 @@ public void test() throws Exception {
           TablePermission.WRITE);
 
       ServerContext context = getServerContext();
-      Writer w = new Writer(context, MetadataTable.ID);
       KeyExtent extent = new KeyExtent(TableId.of("5"), null, null);
 
       Mutation m = new Mutation(extent.toMetaRow());
       // unknown columns should cause constraint violation
       m.put("badcolfam", "badcolqual", "3");
-      var e = assertThrows(RuntimeException.class,
-          () -> MetadataTableUtil.update(context, w, null, m, extent));
-      assertEquals(ConstraintViolationException.class, e.getCause().getClass());
+      var e = assertThrows(IllegalArgumentException.class,
+          () -> MetadataTableUtil.update(context, null, m, extent));
+      assertEquals(MutationsRejectedException.class, e.getCause().getClass());

Review Comment:
   I think the idea here was to verify that the rejection happened because the constraint was violated. I think you need to inspect the MutationsRejectedException a bit deeper in order to get the same verification.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -931,6 +937,8 @@ private MutationSet sendMutationsToTabletServer(String location,
 
       timeoutTracker.startingWrite();
 
+      Long usid = null;

Review Comment:
   OptionalLong is a better style here, as it's more explicit than null checking and implicit reliance on auto-boxing.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java:
##########
@@ -59,62 +42,18 @@ public Writer(ClientContext context, TableId tableId) {
     this.tableId = tableId;
   }
 
-  private static void updateServer(ClientContext context, Mutation m, KeyExtent extent,
-      HostAndPort server) throws TException, NotServingTabletException,
-      ConstraintViolationException, AccumuloSecurityException {
-    checkArgument(m != null, "m is null");
-    checkArgument(extent != null, "extent is null");
-    checkArgument(server != null, "server is null");
-    checkArgument(context != null, "context is null");
-
-    TabletClientService.Iface client = null;
-    try {
-      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, context);
-      client.update(TraceUtil.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(),
-          TDurability.DEFAULT);
-    } catch (ThriftSecurityException e) {
-      throw new AccumuloSecurityException(e.user, e.code);
-    } finally {
-      ThriftUtil.returnClient((TServiceClient) client, context);
-    }
-  }
-
-  public void update(Mutation m) throws AccumuloException, AccumuloSecurityException,
-      ConstraintViolationException, TableNotFoundException {
+  public void update(Mutation m) throws AccumuloException, TableNotFoundException {
     checkArgument(m != null, "m is null");
 
     if (m.size() == 0) {
       throw new IllegalArgumentException("Can not add empty mutations");
     }
 
-    while (true) {
-      TabletLocation tabLoc = TabletLocator.getLocator(context, tableId).locateTablet(context,
-          new Text(m.getRow()), false, true);
-
-      if (tabLoc == null) {
-        log.trace("No tablet location found for row {}", new String(m.getRow(), UTF_8));
-        sleepUninterruptibly(500, MILLISECONDS);
-        continue;
-      }
-
-      final HostAndPort parsedLocation = HostAndPort.fromString(tabLoc.tablet_location);
-      try {
-        updateServer(context, m, tabLoc.tablet_extent, parsedLocation);
-        return;
-      } catch (NotServingTabletException e) {
-        log.trace("Not serving tablet, server = {}", parsedLocation);
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-      } catch (ConstraintViolationException cve) {
-        log.error("error sending update to {}", parsedLocation, cve);
-        // probably do not need to invalidate cache, but it does not hurt
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-        throw cve;
-      } catch (TException e) {
-        log.error("error sending update to {}", parsedLocation, e);
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-      }
+    String table = Ample.DataLevel.of(tableId).metaTable();
 
-      sleepUninterruptibly(500, MILLISECONDS);
+    try (var writer = context.createBatchWriter(table)) {
+      writer.addMutation(m);
+      writer.flush();

Review Comment:
   ```suggestion
   ```
   
   Should flush on close anyway. Not much point in calling it manually. The other places this was changed didn't. So, for consistency, I suggest not putting it here either.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -901,8 +907,8 @@ public void send(TabletServerMutations<Mutation> tsm)
             span.end();
           }
         } catch (IOException e) {
-          if (log.isTraceEnabled()) {
-            log.trace("failed to send mutations to {} : {}", location, e.getMessage());
+          if (log.isDebugEnabled()) {
+            log.debug("failed to send mutations to {} : {}", location, e.getMessage());
           }

Review Comment:
   The guarding based on the log level is unnecessary.
   
   ```suggestion
             log.debug("failed to send mutations to {} : {}", location, e.getMessage());
   ```



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();

Review Comment:
   Should use nanoTime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1698390286

   > I thought it would be useful to not have to tease them apart later if there was a problem that required a follow-up bugfix. 
   
   I am not going to break this into multiple commits in order to optimize possible eventualities that may or may not happen.  If I wanted to spend time on it, I could probably come up with a scenario for breaking this in 3 commits and another for breaking it into 4 commits.  I would rather spend my time ensuring the changes to fix the observed bug are correct.  If anyone needs assistance reviewing because its a big change, I will be glad to chat and discuss the changes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1700021859

   > That's fine. But, do you mind if I do it, instead of merging this as-is?
   
   No I don't mind at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309009937


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {

Review Comment:
   The class is very tightly coupled with the internal implementation and accesses private functions and instance variable of the containing class.  So to push it out would also require changing things to be non private.   I do not think that internal classes that are very tightly coupled with internal  implementation of the containing class need to be moved out.   Its only a class because its internal code that needs to be run in a another thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309105438


##########
test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java:
##########
@@ -52,15 +51,14 @@ public void test() throws Exception {
           TablePermission.WRITE);
 
       ServerContext context = getServerContext();
-      Writer w = new Writer(context, MetadataTable.ID);
       KeyExtent extent = new KeyExtent(TableId.of("5"), null, null);
 
       Mutation m = new Mutation(extent.toMetaRow());
       // unknown columns should cause constraint violation
       m.put("badcolfam", "badcolqual", "3");
-      var e = assertThrows(RuntimeException.class,
-          () -> MetadataTableUtil.update(context, w, null, m, extent));
-      assertEquals(ConstraintViolationException.class, e.getCause().getClass());
+      var e = assertThrows(IllegalArgumentException.class,
+          () -> MetadataTableUtil.update(context, null, m, extent));
+      assertEquals(MutationsRejectedException.class, e.getCause().getClass());

Review Comment:
   changed in a513d75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1697799057

   > I think removing the use of Writer, and cleanup of closing the batch writer are distinct things. Removing the use of the single-mutation Writer can be done as a separate, first step. It would make it much easier to evaluate the changes that are specifically related to the Batch Writer cleanup.
   
   I fixed that because I realized it had the same underlying problem and could cause metadata writes to happen after one would expect.  I can the change commit messages to be more generic in that it fixes all write issues of the same kind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309105736


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -901,8 +907,8 @@ public void send(TabletServerMutations<Mutation> tsm)
             span.end();
           }
         } catch (IOException e) {
-          if (log.isTraceEnabled()) {
-            log.trace("failed to send mutations to {} : {}", location, e.getMessage());
+          if (log.isDebugEnabled()) {
+            log.debug("failed to send mutations to {} : {}", location, e.getMessage());
           }

Review Comment:
   changed in a513d75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309391136


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -160,8 +165,9 @@ public class TabletServerBatchWriter implements AutoCloseable {
   private final HashSet<String> serverSideErrors = new HashSet<>();
   private final FailedMutations failedMutations;
   private int unknownErrors = 0;
-  private boolean somethingFailed = false;
+  private volatile boolean somethingFailed = false;

Review Comment:
   I try to always declare Atomics as final, so there's no additional concern about pointer chasing. But yeah, I get what you mean if that's not certain. Not really relevant here, but I remembered another good use of Atomics that you don't get directly with volatile: compareAndSet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1698206554

   > > I think removing the use of Writer, and cleanup of closing the batch writer are distinct things. Removing the use of the single-mutation Writer can be done as a separate, first step. It would make it much easier to evaluate the changes that are specifically related to the Batch Writer cleanup.
   > 
   > I fixed that because I realized it had the same underlying problem and could cause metadata writes to happen after one would expect. I can the change commit messages to be more generic in that it fixes all write issues of the same kind.
   
   No argument from me for fixing it... it definitely needed to be fixed. I was just thinking it would be nice to clearly see the swapping out of Writer for BatchWriter in one commit, and a separate commit that actually fixes the bug in the BatchWriter, so it's clear which part of this changeset addresses the bug. In case we need to revisit the bugfix later (because it's complicated and I don't fully understand it... I'm just kind of taking your word for it after not seeing anything obviously wrong with it), I thought it would be useful to not have to tease them apart later if there was a problem that required a follow-up bugfix. And also the first commit that would remove the use of Writer would enable the follow on work for #3734 and #3735 to proceed right away, fully decoupled from the bugfix here, since they only depend on the discontinuation of the Writer class, and not on the bugfix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309075743


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -160,8 +165,9 @@ public class TabletServerBatchWriter implements AutoCloseable {
   private final HashSet<String> serverSideErrors = new HashSet<>();
   private final FailedMutations failedMutations;
   private int unknownErrors = 0;
-  private boolean somethingFailed = false;
+  private volatile boolean somethingFailed = false;

Review Comment:
   I agree AtomicBoolean is better here because its not frequently used.  If it were frequently used then using the Atomics introduces a layer of memory indirection causing even more pointer chasing.  Changed in a513d75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309511869


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();
+
+        // If somethingFailed is true then the batch writer will throw an exception on close or
+        // flush, so no need to close this session. Only want to close the session for retryable
+        // exceptions.
+        while (!somethingFailed) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It does not need to be the
+          // exact tserver instance that existed when the session was created because if a new
+          // tserver instance comes up then the session will not exist there. Trying to get the
+          // exact tserver instance that created the session would require changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid);
+            retry.logCompletion(log, "Closed failed write session " + location + " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer than the timeout
+          if ((System.currentTimeMillis() - startTime) > timeout) {

Review Comment:
   > I would prioritize correctness over consistency.
   
   That's a good reason to update.  It was irksome to make it inconsistent, but I made the change in 66c6849 to make it correct.  Don't want to fix all the other places in this PR though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii merged pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii merged PR #3733:
URL: https://github.com/apache/accumulo/pull/3733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309325296


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();
+
+        // If somethingFailed is true then the batch writer will throw an exception on close or
+        // flush, so no need to close this session. Only want to close the session for retryable
+        // exceptions.
+        while (!somethingFailed) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It does not need to be the
+          // exact tserver instance that existed when the session was created because if a new
+          // tserver instance comes up then the session will not exist there. Trying to get the
+          // exact tserver instance that created the session would require changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid);
+            retry.logCompletion(log, "Closed failed write session " + location + " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer than the timeout
+          if ((System.currentTimeMillis() - startTime) > timeout) {

Review Comment:
   I would prioritize correctness over consistency. If we avoid incorrect implementations, it avoids creating more work for somebody else to fix later. This one is independent of any others in this very large class, so it's easy to get it correct now, and will create more work for somebody later if deferred.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1697122028

   I think removing the use of Writer, and cleanup of closing the batch writer are distinct things. Removing the use of the single-mutation Writer can be done as a separate, first step. It would make it much easier to evaluate the changes that are specifically related to the Batch Writer cleanup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309384311


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {

Review Comment:
   I think that the tight coupling might be the problem. It strains the principle of encapsulation by making everything accessible to everything else within a very large class. I think a more modular approach would make this very large class a bit more comprehensible by forcing clear APIs. We've been in this situation before, and it has benefited us to separate things out. I don't think this is that different. I'm not suggesting it all be done here... I'm just suggesting the new class doesn't need to add to the size and complexity of the already large containing class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1310226745


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();

Review Comment:
   Do we need to call `notifyAll` in all cases? If so, I think throwing the exception might defeat that.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));

Review Comment:
   The CloseSessionTask constructor can throw an IllegalStateException when there is a duplicate failed session id. Would this hide or suppress another exception that is being thrown above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1310855067


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();

Review Comment:
   I updated the code to use closeable and moved the code out of the finally block in cf62437 which simplified these changes a bit



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));

Review Comment:
   I updated the code to use closeable and moved the code out of the finally block in https://github.com/apache/accumulo/commit/cf624372ca15e5992b36003be96123c0595e703d which simplified these changes a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1307966608


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,133 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)

Review Comment:
   Not sure if it will help, but I created RetryableThriftCall as part of the external compaction code. Example [here](https://github.com/apache/accumulo/blob/main/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java#L359)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309512266


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();

Review Comment:
   Fixed in 66c6849



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1308990416


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1020,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.currentTimeMillis();
+
+        // If somethingFailed is true then the batch writer will throw an exception on close or
+        // flush, so no need to close this session. Only want to close the session for retryable
+        // exceptions.
+        while (!somethingFailed) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It does not need to be the
+          // exact tserver instance that existed when the session was created because if a new
+          // tserver instance comes up then the session will not exist there. Trying to get the
+          // exact tserver instance that created the session would require changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid);
+            retry.logCompletion(log, "Closed failed write session " + location + " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer than the timeout
+          if ((System.currentTimeMillis() - startTime) > timeout) {

Review Comment:
   The rest of the class is using System.currentTimeMillis(), so I chose to use it here for consistency.  If a change were to be made to use nano time it should be done for the whole class.  That could be a follow on issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309077609


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java:
##########
@@ -59,62 +42,18 @@ public Writer(ClientContext context, TableId tableId) {
     this.tableId = tableId;
   }
 
-  private static void updateServer(ClientContext context, Mutation m, KeyExtent extent,
-      HostAndPort server) throws TException, NotServingTabletException,
-      ConstraintViolationException, AccumuloSecurityException {
-    checkArgument(m != null, "m is null");
-    checkArgument(extent != null, "extent is null");
-    checkArgument(server != null, "server is null");
-    checkArgument(context != null, "context is null");
-
-    TabletClientService.Iface client = null;
-    try {
-      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, context);
-      client.update(TraceUtil.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(),
-          TDurability.DEFAULT);
-    } catch (ThriftSecurityException e) {
-      throw new AccumuloSecurityException(e.user, e.code);
-    } finally {
-      ThriftUtil.returnClient((TServiceClient) client, context);
-    }
-  }
-
-  public void update(Mutation m) throws AccumuloException, AccumuloSecurityException,
-      ConstraintViolationException, TableNotFoundException {
+  public void update(Mutation m) throws AccumuloException, TableNotFoundException {
     checkArgument(m != null, "m is null");
 
     if (m.size() == 0) {
       throw new IllegalArgumentException("Can not add empty mutations");
     }
 
-    while (true) {
-      TabletLocation tabLoc = TabletLocator.getLocator(context, tableId).locateTablet(context,
-          new Text(m.getRow()), false, true);
-
-      if (tabLoc == null) {
-        log.trace("No tablet location found for row {}", new String(m.getRow(), UTF_8));
-        sleepUninterruptibly(500, MILLISECONDS);
-        continue;
-      }
-
-      final HostAndPort parsedLocation = HostAndPort.fromString(tabLoc.tablet_location);
-      try {
-        updateServer(context, m, tabLoc.tablet_extent, parsedLocation);
-        return;
-      } catch (NotServingTabletException e) {
-        log.trace("Not serving tablet, server = {}", parsedLocation);
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-      } catch (ConstraintViolationException cve) {
-        log.error("error sending update to {}", parsedLocation, cve);
-        // probably do not need to invalidate cache, but it does not hurt
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-        throw cve;
-      } catch (TException e) {
-        log.error("error sending update to {}", parsedLocation, e);
-        TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.tablet_extent);
-      }
+    String table = Ample.DataLevel.of(tableId).metaTable();
 
-      sleepUninterruptibly(500, MILLISECONDS);
+    try (var writer = context.createBatchWriter(table)) {
+      writer.addMutation(m);
+      writer.flush();

Review Comment:
   fixed in a513d75. I think that was vestigial code  (I had a loop or somethnig inside the try block that needed that in past iterations)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309223758


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,133 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)

Review Comment:
   Yeah, I think the RetryableThriftCall I created is meant to continue retrying even in the midst of errors. For example, if the CompactionCoordinator is restarting and the Compactor needs to report status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1310739251


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));

Review Comment:
   > Would this hide or suppress another exception that is being thrown above?
   
   Yeah it could. I tried to minimize what was done in the finally block because of that. I would only expect the illegalstate exception to be thrown when there is a bug in the accumulo code, so thinking its ok.  Maybe if I refactored it to use closeable that would handle exceptions better and avoid the need for another thread.  Exceptions that happen with try-with-resources closeable will be appended to any exceptions unlike finally which drops them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1310855067


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,139 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = OptionalLong.empty();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid.isPresent()) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          //
+          // When the task is created below it adds to the failedSessions map, this is done while
+          // the mutations for this task are still incremented in totalMemUsed. It is important that
+          // these overlap in time so that there is no gap where the client would not wait for the
+          // session to close. Overlap in times means the session is added to failedSessions before
+          // the mutations are decremented from totalMemUsed.
+          sendThreadPool.execute(new CloseSessionTask(location, usid.getAsLong()));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();

Review Comment:
   I updated the code to use closeable and moved the code out of the finally block in cf62437 which simplified these changes a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] keith-turner commented on a diff in pull request #3733: ensures no writes happen after batch writer closes

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#discussion_r1309087343


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java:
##########
@@ -1028,14 +1022,133 @@ private MutationSet sendMutationsToTabletServer(String location,
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to bother closing session in this case
+        usid = null;
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to bother closing session in this case
+        usid = null;
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
       } catch (TException e) {
         throw new IOException(e);
+      } finally {
+        if (usid != null) {
+          // There is an open session, must close it before the batchwriter closes or writes could
+          // happen after the batch writer closes. See #3721. Queuing a task instead of executing
+          // the code here because throwing exceptions in a finally block makes the code hard to
+          // reason about. Queue is less likely to throw an exception.
+          sendThreadPool.execute(new CloseSessionTask(location, usid));
+        }
+      }
+    }
+
+    class CloseSessionTask implements Runnable {
+
+      private final String location;
+      private final long usid;
+
+      CloseSessionTask(String location, Long usid) {
+        this.location = location;
+        this.usid = usid;
+        synchronized (TabletServerBatchWriter.this) {
+          if (!failedSessions.add(new Pair<>(usid, location))) {
+            throw new IllegalStateException("Duplicate session " + location + " " + usid);
+          }
+        }
+
+      }
+
+      @Override
+      public void run() {
+        try {
+          closeSession();
+        } catch (InterruptedException | RuntimeException | ThriftSecurityException e) {
+          updateUnknownErrors("Failed to close session " + location + " " + usid, e);
+        } finally {
+          synchronized (TabletServerBatchWriter.this) {
+            if (!failedSessions.remove(new Pair<>(usid, location))) {
+              throw new IllegalStateException("Session missing " + location + " " + usid);
+            }
+            TabletServerBatchWriter.this.notifyAll();
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)

Review Comment:
   I think the way this code is creating the client with different timeouts and responding differently to various thrift exceptions that it might be tricky to use that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] ctubbsii commented on pull request #3733: ensures no writes happen after batch writer closes

Posted by "ctubbsii (via GitHub)" <gi...@apache.org>.
ctubbsii commented on PR #3733:
URL: https://github.com/apache/accumulo/pull/3733#issuecomment-1700834742

   I applied the Writer removal changes (those more closely related to #3735) in commit f76f12baa6e6e5499fe94cc833f02f9e2865f159, and merged that into 2.1 and forward into main first. The only difference to what was originally in this PR was that I also removed an unused private Logger in Writer in that commit.
   
   I've resolved merge conflicts by merging 2.1 into this PR.
   
   Now, this PR just represents the fixes for #3721, and I will squash and merge that separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org