You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/12/10 12:03:59 UTC

[accumulo] branch main updated: Exception and cancellation changes in Compactor process (#2372)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 5233e84  Exception and cancellation changes in Compactor process (#2372)
5233e84 is described below

commit 5233e849c12bc0fe4887a461ef148747721826da
Author: Dave Marion <dl...@apache.org>
AuthorDate: Fri Dec 10 07:03:54 2021 -0500

    Exception and cancellation changes in Compactor process (#2372)
    
    The Thread in the Compactor that runs the FileCompactor was throwing an
    exception as part of the error handling routine. It's not necessary for
    that to occur to convey the exception information to the main thread,
    the exception information was already being conveyed. This will reduce
    the error logging in the Compactor log regarding the uncaught exception
    being thrown and handled by the AccumuloUncaughtExceptionHandler.
    
    Another background thread checks every 5 seconds to determine whether
    or not the current compaction should be canceled. Call this method
    when the compaction completes in case something happens that would
    cause the compaction to be canceled and it has not been noticed yet.
    
    Closes #2350
    
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../src/main/java/org/apache/accumulo/compactor/Compactor.java     | 7 ++++++-
 .../src/test/java/org/apache/accumulo/compactor/CompactorTest.java | 6 ++++++
 .../accumulo/test/compaction/ExternalDoNothingCompactor.java       | 4 +++-
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 6565280..39c05fd 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -566,7 +566,6 @@ public class Compactor extends AbstractServer implements CompactorService.Iface
         } catch (Exception e) {
           LOG.error("Compaction failed", e);
           err.set(e);
-          throw new RuntimeException("Compaction failed", e);
         } finally {
           stopped.countDown();
           Preconditions.checkState(compactionRunning.compareAndSet(true, false));
@@ -712,6 +711,12 @@ public class Compactor extends AbstractServer implements CompactorService.Iface
           compactionThread.join();
           LOG.trace("Compaction thread finished.");
 
+          if (err.get() != null) {
+            // maybe the error occured because the table was deleted or something like that, so
+            // force a cancel check to possibly reduce noise in the logs
+            checkIfCanceled();
+          }
+
           if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
               || ((err.get() != null && err.get().getClass().equals(InterruptedException.class)))) {
             LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 53404e2..f1132d2 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -220,6 +220,9 @@ public class CompactorTest {
     }
 
     @Override
+    protected synchronized void checkIfCanceled() {}
+
+    @Override
     protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries,
         LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped,
         AtomicReference<Throwable> err) {
@@ -337,6 +340,7 @@ public class CompactorTest {
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf);
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -390,6 +394,7 @@ public class CompactorTest {
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf);
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -443,6 +448,7 @@ public class CompactorTest {
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf);
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 5a43ea6..87b5c9f 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -53,6 +53,9 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface {
       LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped,
       AtomicReference<Throwable> err) {
 
+    // Set this to true so that only 1 external compaction is run
+    this.shutdown = true;
+
     return new Runnable() {
       @Override
       public void run() {
@@ -76,7 +79,6 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface {
         } catch (Exception e) {
           LOG.error("Compaction failed", e);
           err.set(e);
-          throw new RuntimeException("Compaction failed", e);
         } finally {
           stopped.countDown();
         }