You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/01/27 00:38:09 UTC

[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r164253697
 
 

 ##########
 File path: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##########
 @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
 
     Status lastStatus = status, currentStatus = status;
     final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-    while (true) {
-      // Set some trace info
-      span = Trace.start("Replicate WAL batch");
-      span.data("Batch size (bytes)", Long.toString(sizeLimit));
-      span.data("File", p.toString());
-      span.data("Peer instance name", peerContext.getInstance().getInstanceName());
-      span.data("Peer tserver", peerTserver.toString());
-      span.data("Remote table ID", remoteTableId);
-
-      ReplicationStats replResult;
-      try {
-        // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids), timeout);
-      } catch (Exception e) {
-        log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e);
-        throw e;
-      } finally {
-        span.stop();
-      }
-
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
-
-      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+    try {
+      while (true) {
+        // Set some trace info
+        span = Trace.start("Replicate WAL batch");
+        span.data("Batch size (bytes)", Long.toString(sizeLimit));
+        span.data("File", p.toString());
+        span.data("Peer instance name", peerContext.getInstance().getInstanceName());
+        span.data("Peer tserver", peerTserver.toString());
+        span.data("Remote table ID", remoteTableId);
+
+        ReplicationStats replResult;
+        try {
+          // Read and send a batch of mutations
+          replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+                  remoteTableId, tcreds, tids), timeout);
+        } catch (Exception e) {
+          log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e);
+          throw e;
+        } finally {
+          span.stop();
+        }
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        span = Trace.start("Update replication table");
-        try {
-          if (null != accumuloUgi) {
-            final Status copy = currentStatus;
-            accumuloUgi.doAs(new PrivilegedAction<Void>() {
-              @Override
-              public Void run() {
-                try {
-                  helper.recordNewStatus(p, copy, target);
-                } catch (Exception e) {
-                  exceptionRef.set(e);
+        currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+        log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          span = Trace.start("Update replication table");
+          try {
+            if (null != accumuloUgi) {
+              final Status copy = currentStatus;
+              accumuloUgi.doAs(new PrivilegedAction<Void>() {
+                @Override
+                public Void run() {
+                  try {
+                    helper.recordNewStatus(p, copy, target);
+                  } catch (Exception e) {
+                    exceptionRef.set(e);
+                  }
+                  return null;
+                }
+              });
+              Exception e = exceptionRef.get();
+              if (null != e) {
+                if (e instanceof TableNotFoundException) {
+                  throw (TableNotFoundException) e;
+                } else if (e instanceof AccumuloSecurityException) {
+                  throw (AccumuloSecurityException) e;
+                } else if (e instanceof AccumuloException) {
+                  throw (AccumuloException) e;
+                } else {
+                  throw new RuntimeException("Received unexpected exception", e);
                 }
-                return null;
-              }
-            });
-            Exception e = exceptionRef.get();
-            if (null != e) {
-              if (e instanceof TableNotFoundException) {
-                throw (TableNotFoundException) e;
-              } else if (e instanceof AccumuloSecurityException) {
-                throw (AccumuloSecurityException) e;
-              } else if (e instanceof AccumuloException) {
-                throw (AccumuloException) e;
-              } else {
-                throw new RuntimeException("Received unexpected exception", e);
               }
+            } else {
+              helper.recordNewStatus(p, currentStatus, target);
             }
-          } else {
-            helper.recordNewStatus(p, currentStatus, target);
+          } catch (TableNotFoundException e) {
+            log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+            throw new RuntimeException("Replication table did not exist, will retry", e);
+          } finally {
+            span.stop();
           }
-        } catch (TableNotFoundException e) {
-          log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
-          throw new RuntimeException("Replication table did not exist, will retry", e);
-        } finally {
-          span.stop();
-        }
 
-        log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus));
+          log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus));
 
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was error sending the data)
-        // we can just not record any updates, and it will be picked up again by the work assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was error sending the data)
+          // we can just not record any updates, and it will be picked up again by the work assigner
+          return status;
+        }
+      }
+    } finally {
+      try {
+        input.close();
+      } catch (IOException e) {
+        log.warn("Received IOException trying to close input stream: {}", e);
 
 Review comment:
   This log message should either not include the `: {}` part, or it should be have the `e` parameter twice. As is, it will convert the exception to a string to substitute for `{}` and we'd lose the stack trace.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services