You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/12 18:18:32 UTC
[accumulo] branch 1451-external-compactions-feature updated:
Changes to get QA build working
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new 75d7f67 Changes to get QA build working
75d7f67 is described below
commit 75d7f67fef54771061b0fff45fdea507ebbf4ca9
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Apr 12 18:18:10 2021 +0000
Changes to get QA build working
---
server/compaction-coordinator/pom.xml | 12 ++
.../coordinator/CompactionCoordinator.java | 63 +++++++-
.../accumulo/coordinator/CompactionFinalizer.java | 4 +-
.../coordinator/CompactionCoordinatorTest.java | 170 ++++++++++-----------
server/compactor/pom.xml | 29 +++-
.../org/apache/accumulo/compactor/Compactor.java | 27 +++-
.../apache/accumulo/compactor/CompactorTest.java | 40 +++--
.../compactions/ExternalCompactionExecutor.java | 20 +++
8 files changed, 238 insertions(+), 127 deletions(-)
diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml
index 0a3d7dd9..66f98b6 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -37,9 +37,17 @@
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-server-base</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-start</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
@@ -48,6 +56,10 @@
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 8f1705a..3d6f749 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -130,6 +130,16 @@ public class CompactionCoordinator extends AbstractServer
printStartupMsg();
}
+ protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfiguration conf) {
+ super("compaction-coordinator", opts, args);
+ aconf = conf;
+ compactionFinalizer = createCompactionFinalizer();
+ tserverSet = createLiveTServerSet();
+ setupSecurity();
+ startGCLogger();
+ printStartupMsg();
+ }
+
protected CompactionFinalizer createCompactionFinalizer() {
return new CompactionFinalizer(getContext());
}
@@ -160,7 +170,9 @@ public class CompactionCoordinator extends AbstractServer
* @param clientAddress
* address of this Compactor
* @throws KeeperException
+ * zookeeper error
* @throws InterruptedException
+ * thread interrupted
*/
protected void getCoordinatorLock(HostAndPort clientAddress)
throws KeeperException, InterruptedException {
@@ -195,6 +207,7 @@ public class CompactionCoordinator extends AbstractServer
*
* @return address of this CompactionCoordinator client service
* @throws UnknownHostException
+ * host unknown
*/
protected ServerAddress startCoordinatorClientService() throws UnknownHostException {
Iface rpcProxy = TraceUtil.wrapService(this);
@@ -343,10 +356,10 @@ public class CompactionCoordinator extends AbstractServer
}
});
}
+ tservers.clear();
} else {
LOG.info("No running tablet servers found, continuing startup");
}
- tservers.clear();
tserverSet.startListeningForTabletServerChanges();
new DeadCompactionDetector(getContext(), compactionFinalizer).start();
@@ -556,6 +569,7 @@ public class CompactionCoordinator extends AbstractServer
* tserver instance
* @return thrift client
* @throws TTransportException
+ * thrift error
*/
protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
throws TTransportException {
@@ -572,6 +586,7 @@ public class CompactionCoordinator extends AbstractServer
* compactor address
* @return thrift client
* @throws TTransportException
+ * thrift error
*/
protected Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
throws TTransportException {
@@ -582,6 +597,15 @@ public class CompactionCoordinator extends AbstractServer
/**
* Called by the TabletServer to cancel the running compaction.
+ *
+ * @param tinfo
+ * trace info
+ * @param credentials
+ * tcredentials object
+ * @param externalCompactionId
+ * compaction id
+ * @throws TException
+ * thrift error
*/
@Override
public void cancelCompaction(TInfo tinfo, TCredentials credentials, String externalCompactionId)
@@ -631,9 +655,15 @@ public class CompactionCoordinator extends AbstractServer
/**
* TServer calls getCompactionStatus to get information about the compaction
*
+ * @param tinfo
+ * trace info
+ * @param credentials
+ * tcredentials object
* @param externalCompactionId
- * id
+ * compaction id
* @return compaction stats or null if not running
+ * @throws TException
+ * thrift error
*/
@Override
public List<Status> getCompactionStatus(TInfo tinfo, TCredentials credentials,
@@ -657,10 +687,20 @@ public class CompactionCoordinator extends AbstractServer
/**
* Compactor calls compactionCompleted passing in the CompactionStats
*
- * @param job
- * compaction job
+ * @param tinfo
+ * trace info
+ * @param credentials
+ * tcredentials object
+ * @param externalCompactionId
+ * compaction id
+ * @param textent
+ * tablet extent
* @param stats
* compaction stats
+ * @throws UnknownCompactionIdException
+ * if compaction is not running
+ * @throws TException
+ * thrift error
*/
@Override
public void compactionCompleted(TInfo tinfo, TCredentials credentials,
@@ -715,9 +755,12 @@ public class CompactionCoordinator extends AbstractServer
*
*
* @param externalCompactionId
+ * compaction id
* @return CompactionStats
* @throws UnknownCompactionIdException
* if compaction is not running
+ * @throws TException
+ * thrift error
*/
public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException {
final var ecid = ExternalCompactionId.of(externalCompactionId);
@@ -743,14 +786,22 @@ public class CompactionCoordinator extends AbstractServer
/**
* Compactor calls to update the status of the assigned compaction
*
- * @param job
- * compaction job
+ * @param tinfo
+ * trace info
+ * @param credentials
+ * tcredentials object
+ * @param externalCompactionId
+ * compaction id
* @param state
* compaction state
* @param message
* informational message
* @param timestamp
* timestamp of the message
+ * @throws UnknownCompactionIdException
+ * if compaction is not running
+ * @throws TException
+ * thrift error
*/
@Override
public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 2b968ba..1b6c6b2 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -89,7 +89,9 @@ public class CompactionFinalizer {
// queue RPC if queue is not full
LOG.info("Queueing tserver notification for completed external compaction: {}", ecfs);
- pendingNotifications.offer(ecfs);
+ if (!pendingNotifications.offer(ecfs)) {
+ LOG.info("Queue full, notification to tablet server will not occur.");
+ }
}
public void failCompactions(Map<ExternalCompactionId,KeyExtent> compactionsToFail) {
diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 3c450fc..a1522b8 100644
--- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -18,6 +18,11 @@
*/
package org.apache.accumulo.coordinator;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
@@ -57,7 +62,6 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.easymock.EasyMock;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -79,7 +83,6 @@ public class CompactionCoordinatorTest {
public class TestCoordinator extends CompactionCoordinator {
private final ServerContext ctx;
- private final AccumuloConfiguration conf;
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;
@@ -87,8 +90,7 @@ public class CompactionCoordinatorTest {
LiveTServerSet tservers, ServerAddress client,
TabletClientService.Client tabletServerClient, ServerContext ctx,
AuditedSecurityOperation security) {
- super(new ServerOpts(), new String[] {});
- this.conf = conf;
+ super(new ServerOpts(), new String[] {}, conf);
this.compactionFinalizer = finalizer;
this.tserverSet = tservers;
this.client = client;
@@ -123,11 +125,6 @@ public class CompactionCoordinatorTest {
protected void printStartupMsg() {}
@Override
- public AccumuloConfiguration getConfiguration() {
- return this.conf;
- }
-
- @Override
public ServerContext getContext() {
return this.ctx;
}
@@ -147,13 +144,6 @@ public class CompactionCoordinatorTest {
}
@Override
- protected org.apache.accumulo.core.compaction.thrift.Compactor.Client
- getCompactorConnection(HostAndPort compactorAddress) throws TTransportException {
- // TODO Auto-generated method stub
- return super.getCompactorConnection(compactorAddress);
- }
-
- @Override
public void compactionCompleted(TInfo tinfo, TCredentials credentials,
String externalCompactionId, TKeyExtent textent, CompactionStats stats) throws TException {}
@@ -221,13 +211,13 @@ public class CompactionCoordinatorTest {
TestCoordinator coordinator =
new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
coordinator.run();
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.getQueues().clear();
@@ -274,27 +264,27 @@ public class CompactionCoordinatorTest {
TestCoordinator coordinator =
new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
coordinator.run();
- Assert.assertEquals(1, coordinator.getQueues().size());
+ assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
- Assert.assertNotNull(m);
- Assert.assertEquals(1, m.size());
- Assert.assertTrue(m.containsKey(1L));
+ assertNotNull(m);
+ assertEquals(1, m.size());
+ assertTrue(m.containsKey(1L));
Set<TServerInstance> t = m.get(1L);
- Assert.assertNotNull(t);
- Assert.assertEquals(1, t.size());
+ assertNotNull(t);
+ assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
- Assert.assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
- Assert.assertEquals(1, coordinator.getIndex().size());
- Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+ assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
+ assertEquals(1, coordinator.getIndex().size());
+ assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
- Assert.assertEquals(1, i.size());
- Assert.assertEquals(qp, i.iterator().next());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(1, i.size());
+ assertEquals(qp, i.iterator().next());
+ assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.getQueues().clear();
@@ -352,27 +342,27 @@ public class CompactionCoordinatorTest {
TestCoordinator coordinator =
new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
coordinator.run();
- Assert.assertEquals(1, coordinator.getQueues().size());
+ assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
- Assert.assertNotNull(m);
- Assert.assertEquals(1, m.size());
- Assert.assertTrue(m.containsKey(1L));
+ assertNotNull(m);
+ assertEquals(1, m.size());
+ assertTrue(m.containsKey(1L));
Set<TServerInstance> t = m.get(1L);
- Assert.assertNotNull(t);
- Assert.assertEquals(1, t.size());
+ assertNotNull(t);
+ assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
- Assert.assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
- Assert.assertEquals(1, coordinator.getIndex().size());
- Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+ assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
+ assertEquals(1, coordinator.getIndex().size());
+ assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
- Assert.assertEquals(1, i.size());
- Assert.assertEquals(qp, i.iterator().next());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(1, i.size());
+ assertEquals(qp, i.iterator().next());
+ assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.getQueues().clear();
@@ -439,27 +429,27 @@ public class CompactionCoordinatorTest {
TestCoordinator coordinator =
new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
coordinator.run();
- Assert.assertEquals(1, coordinator.getQueues().size());
+ assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
- Assert.assertNotNull(m);
- Assert.assertEquals(1, m.size());
- Assert.assertTrue(m.containsKey(1L));
+ assertNotNull(m);
+ assertEquals(1, m.size());
+ assertTrue(m.containsKey(1L));
Set<TServerInstance> t = m.get(1L);
- Assert.assertNotNull(t);
- Assert.assertEquals(1, t.size());
+ assertNotNull(t);
+ assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
- Assert.assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
- Assert.assertEquals(1, coordinator.getIndex().size());
- Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+ assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
+ assertEquals(1, coordinator.getIndex().size());
+ assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
- Assert.assertEquals(1, i.size());
- Assert.assertEquals(qp, i.iterator().next());
- Assert.assertEquals(1, coordinator.getRunning().size());
+ assertEquals(1, i.size());
+ assertEquals(qp, i.iterator().next());
+ assertEquals(1, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.getQueues().clear();
@@ -518,44 +508,44 @@ public class CompactionCoordinatorTest {
TestCoordinator coordinator =
new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
- Assert.assertEquals(0, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(0, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(0, coordinator.getRunning().size());
// Use coordinator.run() to populate the internal data structures. This is tested in a different
// test.
coordinator.run();
- Assert.assertEquals(1, coordinator.getQueues().size());
+ assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
- Assert.assertNotNull(m);
- Assert.assertEquals(1, m.size());
- Assert.assertTrue(m.containsKey(1L));
+ assertNotNull(m);
+ assertEquals(1, m.size());
+ assertTrue(m.containsKey(1L));
Set<TServerInstance> t = m.get(1L);
- Assert.assertNotNull(t);
- Assert.assertEquals(1, t.size());
+ assertNotNull(t);
+ assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
- Assert.assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
- Assert.assertEquals(1, coordinator.getIndex().size());
- Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+ assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
+ assertEquals(1, coordinator.getIndex().size());
+ assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
- Assert.assertEquals(1, i.size());
- Assert.assertEquals(qp, i.iterator().next());
- Assert.assertEquals(0, coordinator.getRunning().size());
+ assertEquals(1, i.size());
+ assertEquals(qp, i.iterator().next());
+ assertEquals(0, coordinator.getRunning().size());
// Get the next job
TExternalCompactionJob createdJob =
coordinator.getCompactionJob(trace, creds, "R2DQ", "localhost:10241", eci.toString());
- Assert.assertEquals(eci.toString(), createdJob.getExternalCompactionId());
+ assertEquals(eci.toString(), createdJob.getExternalCompactionId());
- Assert.assertEquals(1, coordinator.getQueues().size());
- Assert.assertEquals(0, coordinator.getIndex().size());
- Assert.assertEquals(1, coordinator.getRunning().size());
+ assertEquals(1, coordinator.getQueues().size());
+ assertEquals(0, coordinator.getIndex().size());
+ assertEquals(1, coordinator.getRunning().size());
Entry<ExternalCompactionId,RunningCompaction> entry =
coordinator.getRunning().entrySet().iterator().next();
- Assert.assertEquals(eci.toString(), entry.getKey().toString());
- Assert.assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
- Assert.assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId());
+ assertEquals(eci.toString(), entry.getKey().toString());
+ assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
+ assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId());
PowerMock.verifyAll();
coordinator.getQueues().clear();
@@ -595,7 +585,7 @@ public class CompactionCoordinatorTest {
coordinator.getRunning().clear();
TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ",
"localhost:10240", UUID.randomUUID().toString());
- Assert.assertNull(job.getExternalCompactionId());
+ assertNull(job.getExternalCompactionId());
PowerMock.verifyAll();
coordinator.close();
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index e193175..0649328 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -31,25 +31,41 @@
<name>Apache Accumulo Compactor</name>
<dependencies>
<dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-server-base</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-start</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -66,6 +82,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<scope>test</scope>
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 9b61811..ade2c09 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -136,6 +136,15 @@ public class Compactor extends AbstractServer
printStartupMsg();
}
+ protected Compactor(CompactorServerOpts opts, String[] args, AccumuloConfiguration conf) {
+ super("compactor", opts, args);
+ queueName = opts.getQueueName();
+ aconf = conf;
+ setupSecurity();
+ startGCLogger();
+ printStartupMsg();
+ }
+
protected void setupSecurity() {
getContext().setupCrypto();
security = AuditedSecurityOperation.getInstance(getContext());
@@ -159,7 +168,9 @@ public class Compactor extends AbstractServer
* address of this Compactor
*
* @throws KeeperException
+ * zookeeper error
* @throws InterruptedException
+ * thread interrupted
*/
protected void announceExistence(HostAndPort clientAddress)
throws KeeperException, InterruptedException {
@@ -227,6 +238,7 @@ public class Compactor extends AbstractServer
*
* @return address of this compactor client service
* @throws UnknownHostException
+ * host unknown
*/
protected ServerAddress startCompactorClientService() throws UnknownHostException {
Iface rpcProxy = TraceUtil.wrapService(this);
@@ -273,7 +285,11 @@ public class Compactor extends AbstractServer
* Cancel the compaction with this id.
*
* @param externalCompactionId
+ * compaction id
+ * @throws UnknownCompactionIdException
+ * if the externalCompactionId does not match the currently executing compaction
* @throws TException
+ * thrift error
*/
private void cancel(String externalCompactionId) throws TException {
synchronized (JOB_HOLDER) {
@@ -299,6 +315,7 @@ public class Compactor extends AbstractServer
* @param message
* updated message
* @throws RetriesExceededException
+ * thrown when retries have been exceeded
*/
protected void updateCompactionState(TExternalCompactionJob job, CompactionState state,
String message) throws RetriesExceededException {
@@ -330,6 +347,7 @@ public class Compactor extends AbstractServer
* @param job
* current compaction job
* @throws RetriesExceededException
+ * thrown when retries have been exceeded
*/
protected void updateCompactionFailed(TExternalCompactionJob job)
throws RetriesExceededException {
@@ -359,6 +377,7 @@ public class Compactor extends AbstractServer
* @param stats
* compaction stats
* @throws RetriesExceededException
+ * thrown when retries have been exceeded
*/
protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
throws RetriesExceededException {
@@ -383,12 +402,11 @@ public class Compactor extends AbstractServer
/**
* Get the next job to run
*
- * @param coordinatorClient
- * address of the CompactionCoordinator
- * @param compactorAddress
- * address of this Compactor
+ * @param uuid
+ * uuid supplier
* @return CompactionJob
* @throws RetriesExceededException
+ * thrown when retries have been exceeded
*/
protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException {
RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
@@ -513,6 +531,7 @@ public class Compactor extends AbstractServer
* Returns the number of seconds to wait in between progress checks based on input file sizes
*
* @param numBytes
+ * number of bytes in input file
* @return number of seconds to wait between progress checks
*/
protected long calculateProgressCheckTime(long numBytes) {
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 1ed79fb..73825d9 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.accumulo.compactor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.net.UnknownHostException;
import java.util.Timer;
import java.util.TimerTask;
@@ -45,7 +49,6 @@ import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.zookeeper.KeeperException;
import org.easymock.EasyMock;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -157,7 +160,6 @@ public class CompactorTest {
private final Supplier<UUID> uuid;
private final ServerAddress address;
private final TExternalCompactionJob job;
- private final AccumuloConfiguration conf;
private final ServerContext ctx;
private final ExternalCompactionId eci;
private volatile boolean completedCalled = false;
@@ -166,11 +168,10 @@ public class CompactorTest {
SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job,
AccumuloConfiguration conf, ServerContext ctx, ExternalCompactionId eci) {
- super(new CompactorServerOpts(), new String[] {"-q", "testQ"});
+ super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, conf);
this.uuid = uuid;
this.address = address;
this.job = job;
- this.conf = conf;
this.ctx = ctx;
this.eci = eci;
}
@@ -190,11 +191,6 @@ public class CompactorTest {
}
@Override
- public AccumuloConfiguration getConfiguration() {
- return this.conf;
- }
-
- @Override
protected void announceExistence(HostAndPort clientAddress)
throws KeeperException, InterruptedException {}
@@ -291,11 +287,11 @@ public class CompactorTest {
public void testCheckTime() throws Exception {
// Instantiates class without calling constructor
Compactor c = Whitebox.newInstance(Compactor.class);
- Assert.assertEquals(1, c.calculateProgressCheckTime(1024));
- Assert.assertEquals(1, c.calculateProgressCheckTime(1048576));
- Assert.assertEquals(1, c.calculateProgressCheckTime(10485760));
- Assert.assertEquals(10, c.calculateProgressCheckTime(104857600));
- Assert.assertEquals(102, c.calculateProgressCheckTime(1024 * 1024 * 1024));
+ assertEquals(1, c.calculateProgressCheckTime(1024));
+ assertEquals(1, c.calculateProgressCheckTime(1048576));
+ assertEquals(1, c.calculateProgressCheckTime(10485760));
+ assertEquals(10, c.calculateProgressCheckTime(104857600));
+ assertEquals(102, c.calculateProgressCheckTime(1024 * 1024 * 1024));
}
@Test
@@ -341,8 +337,8 @@ public class CompactorTest {
PowerMock.verifyAll();
c.close();
- Assert.assertTrue(c.isCompletedCalled());
- Assert.assertFalse(c.isFailedCalled());
+ assertTrue(c.isCompletedCalled());
+ assertFalse(c.isFailedCalled());
}
@Test
@@ -388,9 +384,9 @@ public class CompactorTest {
PowerMock.verifyAll();
c.close();
- Assert.assertFalse(c.isCompletedCalled());
- Assert.assertTrue(c.isFailedCalled());
- Assert.assertEquals(CompactionState.FAILED, c.getLatestState());
+ assertFalse(c.isCompletedCalled());
+ assertTrue(c.isFailedCalled());
+ assertEquals(CompactionState.FAILED, c.getLatestState());
}
@Test
@@ -436,9 +432,9 @@ public class CompactorTest {
PowerMock.verifyAll();
c.close();
- Assert.assertFalse(c.isCompletedCalled());
- Assert.assertTrue(c.isFailedCalled());
- Assert.assertEquals(CompactionState.CANCELLED, c.getLatestState());
+ assertFalse(c.isCompletedCalled());
+ assertTrue(c.isFailedCalled());
+ assertEquals(CompactionState.CANCELLED, c.getLatestState());
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 7785b19..48f8d35 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -79,6 +79,26 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
return Long.compare(o.getJob().getPriority(), getJob().getPriority());
}
+ @Override
+ public boolean equals(Object obj) {
+ if (null == obj) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof ExternalJob) {
+ ExternalJob other = (ExternalJob) obj;
+ return (this.compareTo(other) == 0);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(this.getJob().getPriority());
+ }
+
}
private PriorityBlockingQueue<ExternalJob> queue;