You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/12 19:54:02 UTC
[accumulo] branch 1451-external-compactions-feature updated: closes
#1997 - cancel compaction on table delete
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new 8b54b3f closes #1997 - cancel compaction on table delete
8b54b3f is described below
commit 8b54b3f50f8e29a0dbecaaf05f1103c1b87c91f3
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Apr 12 19:52:37 2021 +0000
closes #1997 - cancel compaction on table delete
Added a Watcher in the Compactor for the table when a compaction starts. If the
Watcher gets the NodeDeleted event, then it cancels the compaction.
---
.../org/apache/accumulo/compactor/Compactor.java | 52 ++++++++++++++++++++++
.../apache/accumulo/compactor/CompactorTest.java | 43 ++++++++++++++++--
2 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ade2c09..130cda3 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -71,6 +71,7 @@ import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
@@ -89,6 +90,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -601,7 +606,52 @@ public class Compactor extends AbstractServer
JOB_HOLDER.set(job, compactionThread);
}
+ final String tableId = new String(job.getExtent().getTable(), UTF_8);
+ final ServerContext ctxRef = getContext();
+ String tablePath = getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId;
+ Watcher tableNodeWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ switch (event.getType()) {
+ case NodeDeleted:
+ LOG.info("Zookeeper node for table {} deleted, cancelling compaction.", tableId);
+ JOB_HOLDER.cancel();
+ break;
+ default:
+ // Watcher got fired for some other event, need to recreate the Watcher
+ try {
+ Stat s = ctxRef.getZooReaderWriter().getZooKeeper().exists(tablePath, this);
+ if (s == null) {
+ LOG.info("Zookeeper node for table {} deleted before compaction started.",
+ tableId);
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just
+ // created.
+ ctxRef.getZooReaderWriter().getZooKeeper().removeWatches(tablePath, this,
+ WatcherType.Any, true);
+ }
+ } catch (Exception e) {
+ LOG.error("Error communicating with ZooKeeper and unable to recreate Watcher", e);
+ // CBUG: Should we exit?
+ }
+ break;
+ }
+ }
+ };
try {
+ // Add a watcher in ZooKeeper on the table id so that we can cancel this compaction
+ // if the table is deleted
+ Stat s =
+ getContext().getZooReaderWriter().getZooKeeper().exists(tablePath, tableNodeWatcher);
+ if (s == null) {
+ LOG.info("Zookeeper node for table {} deleted before compaction started.", tableId);
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just created.
+ getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
+ tableNodeWatcher, WatcherType.Any, true);
+ continue;
+ }
+
compactionThread.start(); // start the compactionThread
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
@@ -686,6 +736,8 @@ public class Compactor extends AbstractServer
LOG.error("Error cancelling compaction.", e2);
}
} finally {
+ getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
+ tableNodeWatcher, WatcherType.Any, true);
currentCompactionId.set(null);
}
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 73825d9..7495aa1 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -48,6 +49,9 @@ import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -157,6 +161,7 @@ public class CompactorTest {
private final Logger LOG = LoggerFactory.getLogger(SuccessfulCompactor.class);
+ private final AccumuloConfiguration conf;
private final Supplier<UUID> uuid;
private final ServerAddress address;
private final TExternalCompactionJob job;
@@ -169,6 +174,7 @@ public class CompactorTest {
SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job,
AccumuloConfiguration conf, ServerContext ctx, ExternalCompactionId eci) {
super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, conf);
+ this.conf = conf;
this.uuid = uuid;
this.address = address;
this.job = job;
@@ -177,6 +183,11 @@ public class CompactorTest {
}
@Override
+ public AccumuloConfiguration getConfiguration() {
+ return conf;
+ }
+
+ @Override
protected void setupSecurity() {}
@Override
@@ -319,12 +330,20 @@ public class CompactorTest {
TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
- EasyMock.expect(job.getExtent()).andReturn(extent);
+ EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+ EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+ ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+ ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+ Stat stat = PowerMock.createNiceMock(Stat.class);
+ EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+ EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+ EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+ .andReturn(stat);
VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
vm.close();
@@ -364,14 +383,24 @@ public class CompactorTest {
TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class);
TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
+ EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes());
+
EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
- EasyMock.expect(job.getExtent()).andReturn(extent);
+ EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+ EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+ ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+ ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+ Stat stat = PowerMock.createNiceMock(Stat.class);
+ EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+ EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+ EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+ .andReturn(stat);
VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
vm.close();
@@ -414,12 +443,20 @@ public class CompactorTest {
TKeyExtent extent = PowerMock.createNiceMock(TKeyExtent.class);
EasyMock.expect(job.isSetExternalCompactionId()).andReturn(true).anyTimes();
EasyMock.expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
- EasyMock.expect(job.getExtent()).andReturn(extent);
+ EasyMock.expect(job.getExtent()).andReturn(extent).anyTimes();
+ EasyMock.expect(extent.getTable()).andReturn("testTable".getBytes()).anyTimes();
AccumuloConfiguration conf = PowerMock.createNiceMock(AccumuloConfiguration.class);
EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L);
ServerContext ctx = PowerMock.createNiceMock(ServerContext.class);
+ ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
+ ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
+ Stat stat = PowerMock.createNiceMock(Stat.class);
+ EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes();
+ EasyMock.expect(zrw.getZooKeeper()).andReturn(zk).anyTimes();
+ EasyMock.expect(zk.exists(EasyMock.isA(String.class), EasyMock.isA(Watcher.class)))
+ .andReturn(stat);
VolumeManagerImpl vm = PowerMock.createNiceMock(VolumeManagerImpl.class);
EasyMock.expect(ctx.getVolumeManager()).andReturn(vm);
vm.close();