You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/12 19:54:02 UTC

[accumulo] branch 1451-external-compactions-feature updated: closes #1997 - cancel compaction on table delete

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 8b54b3f  closes #1997 - cancel compaction on table delete
8b54b3f is described below

commit 8b54b3f50f8e29a0dbecaaf05f1103c1b87c91f3
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Apr 12 19:52:37 2021 +0000

    closes #1997 - cancel compaction on table delete
    
    Added a Watcher in the Compactor for the table when a compaction starts. If the
    Watcher gets the NodeDeleted event, then it cancels the compaction.
---
 .../org/apache/accumulo/compactor/Compactor.java   | 52 ++++++++++++++++++++++
 .../apache/accumulo/compactor/CompactorTest.java   | 43 ++++++++++++++++--
 2 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ade2c09..130cda3 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -71,6 +71,7 @@ import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
@@ -89,6 +90,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -601,7 +606,52 @@ public class Compactor extends AbstractServer
           JOB_HOLDER.set(job, compactionThread);
         }
 
+        final String tableId = new String(job.getExtent().getTable(), UTF_8);
+        final ServerContext ctxRef = getContext();
+        String tablePath = getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId;
+        Watcher tableNodeWatcher = new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            switch (event.getType()) {
+              case NodeDeleted:
+                LOG.info("Zookeeper node for table {} deleted, cancelling compaction.", tableId);
+                JOB_HOLDER.cancel();
+                break;
+              default:
+                // Watcher got fired for some other event, need to recreate the Watcher
+                try {
+                  Stat s = ctxRef.getZooReaderWriter().getZooKeeper().exists(tablePath, this);
+                  if (s == null) {
+                    LOG.info("Zookeeper node for table {} deleted before compaction started.",
+                        tableId);
+                    // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+                    // created a Watcher on a node that does not exist. Delete the watcher we just
+                    // created.
+                    ctxRef.getZooReaderWriter().getZooKeeper().removeWatches(tablePath, this,
+                        WatcherType.Any, true);
+                  }
+                } catch (Exception e) {
+                  LOG.error("Error communicating with ZooKeeper and unable to recreate Watcher", e);
+                  // CBUG: Should we exit?
+                }
+                break;
+            }
+          }
+        };
         try {
+          // Add a watcher in ZooKeeper on the table id so that we can cancel this compaction
+          // if the table is deleted
+          Stat s =
+              getContext().getZooReaderWriter().getZooKeeper().exists(tablePath, tableNodeWatcher);
+          if (s == null) {
+            LOG.info("Zookeeper node for table {} deleted before compaction started.", tableId);
+            // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+            // created a Watcher on a node that does not exist. Delete the watcher we just created.
+            getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
+                tableNodeWatcher, WatcherType.Any, true);
+            continue;
+          }
+
           compactionThread.start(); // start the compactionThread
           started.await(); // wait until the compactor is started
           final long inputEntries = totalInputEntries.sum();
@@ -686,6 +736,8 @@ public class Compactor extends AbstractServer
             LOG.error("Error cancelling compaction.", e2);
           }
         } finally {
+          getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
+              tableNodeWatcher, WatcherType.Any, true);
           currentCompactionId.set(null);
         }
 
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 73825d9..7495aa1 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -48,6 +49,9 @@ import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.easymock.EasyMock;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -157,6 +161,7 @@ public class CompactorTest {
 
     private final Logger LOG = LoggerFactory.getLogger(SuccessfulCompactor.class);
 
+    private final AccumuloConfiguration conf;
     private final Supplier<UUID> uuid;
     private final ServerAddress address;
     private final TExternalCompactionJob job;
@@ -169,6 +174,7 @@ public class CompactorTest {
     SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job,
         AccumuloConfiguration conf, ServerContext ctx, ExternalCompactionId eci) {
       super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, conf);
+      this.conf = conf;
       this.uuid = uuid;
       this.address = address;
       this.job = job;
@@ -177,6 +183,11 @@ public class CompactorTest {
     }
 
     @Override
+    public AccumuloConfiguration getConfiguration() {
+      return conf;
+    }
+
+    @Override
     protected void setupSecurity() {}
 
     @Override
@@ -319,12 +330,20 @@ public class CompactorTest {
     TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
     EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
     EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
-    EasyMock.expect(job.getExtent()).andReturn(extent);
+    EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+    EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
 
     AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+    ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+    ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+    Stat stat = PowerMock.createNiceMock(Stat.class);
+    EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+    EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+    EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+        .andReturn(stat);
     VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
     EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
     vm.close();
@@ -364,14 +383,24 @@ public class CompactorTest {
 
     TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class);
     TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
+    EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes());
+
     EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
     EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
-    EasyMock.expect(job.getExtent()).andReturn(extent);
+    EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+    EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
 
     AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+    ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+    ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+    Stat stat = PowerMock.createNiceMock(Stat.class);
+    EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+    EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+    EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+        .andReturn(stat);
     VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
     EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
     vm.close();
@@ -414,12 +443,20 @@ public class CompactorTest {
     TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
     EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
     EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
-    EasyMock.expect(job.getExtent()).andReturn(extent);
+    EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+    EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
 
     AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
     EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
 
     ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+    ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+    ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+    Stat stat = PowerMock.createNiceMock(Stat.class);
+    EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+    EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+    EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+        .andReturn(stat);
     VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
     EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
     vm.close();