You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/16 19:35:03 UTC
[accumulo] branch 1451-external-compactions-feature updated:
Updates from testing
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new b0317c7 Updates from testing
b0317c7 is described below
commit b0317c77a11e78f0683ddbe4b0f88d1c54b66f55
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 16 19:33:12 2021 +0000
Updates from testing
Added SharedRateLimiterFactory::remove to remove the compaction limiter
from the list when we are done to prevent the background threads from
getting a NPE. Modified the CompactionCoordinator::getCompactionJob
loop to try another server if the first one does not have a job.
Modified Thrift calls so that they did not return null and instead
return an empty object. Modified all RetryableThriftCall<Void> to
RetryableThriftCall<String> so that the object did not retry the
function when there was a successful call.
---
.../util/ratelimit/SharedRateLimiterFactory.java | 14 ++
.../coordinator/CompactionCoordinator.java | 235 ++++++++++-----------
.../accumulo/coordinator/RunningCompaction.java | 12 +-
.../accumulo/compactor/CompactionEnvironment.java | 19 +-
.../org/apache/accumulo/compactor/Compactor.java | 104 +++++----
.../accumulo/tserver/ThriftClientHandler.java | 3 +-
.../tserver/compactions/ExternalCompactionJob.java | 2 +
.../accumulo/tserver/tablet/CompactableImpl.java | 2 +
8 files changed, 201 insertions(+), 190 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c24a3d2..accfd95 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -99,6 +99,20 @@ public class SharedRateLimiterFactory {
}
/**
+ * Remove the rate limiter from the set of active limiters, if it exists
+ *
+ * @param name
+ * key for the rate limiter
+ */
+ public void remove(String name) {
+ synchronized (activeLimiters) {
+ if (activeLimiters.containsKey(name)) {
+ activeLimiters.remove(name);
+ }
+ }
+ }
+
+ /**
* Walk through all of the currently active RateLimiters, having each update its current rate.
* This is called periodically so that we can dynamically update as configuration changes.
*/
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5ede596..b4b2cb4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -189,6 +189,9 @@ public class CompactionCoordinator extends AbstractServer
// TODO: On initial startup contact all running tservers to get information about the
// compactions that are current running in external queues to populate the RUNNING map.
// This is to handle the case where the coordinator dies or is restarted at runtime
+ //
+ // Alternatively, we could use the status messages in updateCompactionStatus to rebuild
+ // the RUNNING map.
tserverSet.startListeningForTabletServerChanges();
@@ -251,7 +254,7 @@ public class CompactionCoordinator extends AbstractServer
// Find any running compactions for the tserver
final List<ExternalCompactionId> toCancel = new ArrayList<>();
RUNNING.forEach((k, v) -> {
- if (v.getCompactorAddress().equals(tsi)) {
+ if (v.getTserver().equals(tsi)) {
toCancel.add(k);
}
});
@@ -295,63 +298,84 @@ public class CompactionCoordinator extends AbstractServer
// CBUG need to use and check for system credentials
LOG.debug("getCompactionJob " + queueName + " " + compactorAddress);
String queue = queueName.intern();
- TServerInstance tserver = null;
- Long priority = null;
+ TExternalCompactionJob result = null;
// CBUG Review synchronization on QUEUES
synchronized (QUEUES) {
TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
- if (null != m) {
- while (tserver == null) {
+ if (null != m && !m.isEmpty()) {
+ while (result == null) {
+
+ // m could become empty if we have contacted all tservers in this queue and
+ // there are no compactions
+ if (m.isEmpty()) {
+ LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+ compactorAddress);
+ result = new TExternalCompactionJob();
+ break;
+ }
+
// Get the first TServerInstance from the highest priority queue
Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
- priority = entry.getKey();
+ Long priority = entry.getKey();
LinkedHashSet<TServerInstance> tservers = entry.getValue();
- if (null == tservers || m.isEmpty()) {
+
+ if (null == tservers || tservers.isEmpty()) {
// Clean up the map entry when no tservers for this queue and priority
m.remove(entry.getKey(), entry.getValue());
continue;
} else {
- tserver = tservers.iterator().next();
+ TServerInstance tserver = tservers.iterator().next();
+ LOG.debug("Found tserver {} with priority {} for queue {}", tserver.getHostAndPort(),
+ priority, queue);
// Remove the tserver from the list, we are going to run a compaction on this server
tservers.remove(tserver);
- if (tservers.size() == 0) {
+ if (tservers.isEmpty()) {
// Clean up the map entry when no tservers remaining for this queue and priority
+ // CBUG This may be redundant as cleanup happens in the 'if' clause above
m.remove(entry.getKey(), entry.getValue());
}
HashSet<QueueAndPriority> qp = INDEX.get(tserver);
qp.remove(QueueAndPriority.get(queue, priority));
- if (qp.size() == 0) {
+ if (qp.isEmpty()) {
// Remove the tserver from the index
INDEX.remove(tserver);
}
- break;
+ LOG.debug("Getting compaction for queue {} from tserver {}", queue,
+ tserver.getHostAndPort());
+ // Get a compaction from the tserver
+ TabletClientService.Client client = null;
+ try {
+ client = getTabletServerConnection(tserver);
+ TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
+ getContext().rpcCreds(), queue, priority, compactorAddress);
+ if (null == job.getExternalCompactionId()) {
+ LOG.debug("No compactions found for queue {} on tserver {}, trying next tserver",
+ queue, tserver.getHostAndPort(), compactorAddress);
+ continue;
+ }
+ RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+ new RunningCompaction(job, compactorAddress, tserver));
+ LOG.debug("Returning external job {} to {}", job.externalCompactionId,
+ compactorAddress);
+ result = job;
+ break;
+ } catch (TException e) {
+ LOG.error(
+ "Error from tserver {} while trying to reserve compaction, trying next tserver",
+ ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
}
}
+ } else {
+ LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+ compactorAddress);
+ result = new TExternalCompactionJob();
}
}
+ return result;
- if (null == tserver) {
- LOG.debug("No compactions found for queue {}, returning empty job to compactor {}", queue,
- compactorAddress);
- return new TExternalCompactionJob();
- }
-
- TabletClientService.Client client = null;
- try {
- client = getTabletServerConnection(tserver);
- TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
- getContext().rpcCreds(), queue, priority, compactorAddress);
- RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
- new RunningCompaction(job, compactorAddress, tserver));
- LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
- return job;
- } catch (TException e) {
- LOG.error("Error reserving compaction from tserver {}",
- ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
- throw e;
- } finally {
- ThriftUtil.returnClient(client);
- }
}
protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
@@ -377,44 +401,52 @@ public class CompactionCoordinator extends AbstractServer
LOG.info("Compaction cancel requested, id: {}", externalCompactionId);
RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
if (null == rc) {
- throw new UnknownCompactionIdException();
+ return;
}
- HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress());
- RetryableThriftCall<Void> cancelThriftCall = new RetryableThriftCall<>(1000,
- RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<Void>() {
- @Override
- public Void execute() throws TException {
- Compactor.Client compactorConnection = null;
- try {
- compactorConnection = getCompactorConnection(compactor);
- compactorConnection.cancel(rc.getJob().getExternalCompactionId());
- return null;
- } catch (TException e) {
- throw e;
- } finally {
- ThriftUtil.returnClient(compactorConnection);
+ if (!rc.isCompleted()) {
+ HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress());
+ RetryableThriftCall<String> cancelThriftCall = new RetryableThriftCall<>(1000,
+ RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<String>() {
+ @Override
+ public String execute() throws TException {
+ Compactor.Client compactorConnection = null;
+ try {
+ compactorConnection = getCompactorConnection(compactor);
+ compactorConnection.cancel(rc.getJob().getExternalCompactionId());
+ return "";
+ } catch (TException e) {
+ throw e;
+ } finally {
+ ThriftUtil.returnClient(compactorConnection);
+ }
}
- }
- });
- try {
- cancelThriftCall.run();
- } catch (RetriesExceededException e) {
- LOG.error("Unable to contact Compactor {} to cancel running compaction {}",
- rc.getCompactorAddress(), rc.getJob(), e);
+ });
+ try {
+ cancelThriftCall.run();
+ } catch (RetriesExceededException e) {
+ LOG.error("Unable to contact Compactor {} to cancel running compaction {}",
+ rc.getCompactorAddress(), rc.getJob(), e);
+ }
}
}
+ /**
+ * TServer calls getCompactionStatus to get information about the compaction
+ *
+ * @param externalCompactionId
+ * id
+ * @return compaction stats or null if not running
+ */
@Override
public List<Status> getCompactionStatus(String externalCompactionId) throws TException {
+ List<Status> status = new ArrayList<>();
RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
- if (null == rc) {
- throw new UnknownCompactionIdException();
+ if (null != rc) {
+ rc.getUpdates().forEach((k, v) -> {
+ status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(),
+ rc.getCompactorAddress(), v.getState(), v.getMessage()));
+ });
}
- List<Status> status = new ArrayList<>();
- rc.getUpdates().forEach((k, v) -> {
- status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(),
- rc.getCompactorAddress(), v.getState(), v.getMessage()));
- });
return status;
}
@@ -434,6 +466,7 @@ public class CompactionCoordinator extends AbstractServer
RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
rc.setStats(stats);
+ rc.setCompleted();
} else {
LOG.error(
"Compaction completed called by Compactor for {}, but no running compaction for that id.",
@@ -442,17 +475,19 @@ public class CompactionCoordinator extends AbstractServer
}
// Attempt up to ten times to contact the TServer and notify it that the compaction has
// completed.
- RetryableThriftCall<Void> completedThriftCall = new RetryableThriftCall<>(1000,
- RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<Void>() {
+ RetryableThriftCall<String> completedThriftCall = new RetryableThriftCall<>(1000,
+ RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<String>() {
@Override
- public Void execute() throws TException {
+ public String execute() throws TException {
TabletClientService.Client client = null;
try {
client = getTabletServerConnection(rc.getTserver());
client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(),
externalCompactionId, stats.fileSize, stats.entriesWritten);
RUNNING.remove(ecid, rc);
- return null;
+ LOG.info("TServer {} notified of compaction {} completion",
+ rc.getTserver().getHostAndPort(), externalCompactionId);
+ return "";
} catch (TException e) {
throw e;
} finally {
@@ -461,64 +496,6 @@ public class CompactionCoordinator extends AbstractServer
}
});
try {
- // CBUG Saw the following situation in testing:
- // 1. Compactor ran a compaction and completed.
- // 2. Compactor called this method
- // 3. Thrift timeout occurred and the Compactor retried due to RetryableThriftCall
- // 4. Upon retry this method returned UnknownCompactionIdException because no entry in RUNNING
-
- // See Method below where tserver could poll coordinator to see if compaction is completed.
-
- // "compactor" #38 prio=5 os_prio=0 cpu=157.59ms elapsed=197.99s tid=0x000055ea28438800
- // nid=0x4dae runnable [0x00007fb0c5f2e000]
- // java.lang.Thread.State: RUNNABLE
- // at sun.nio.ch.EPoll.wait(java.base@11.0.10/Native Method)
- // at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.10/EPollSelectorImpl.java:120)
- // at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.10/SelectorImpl.java:124)
- // - locked <0x00000000f449acc0> (a sun.nio.ch.Util$2)
- // - locked <0x00000000f449aa60> (a sun.nio.ch.EPollSelectorImpl)
- // at sun.nio.ch.SelectorImpl.select(java.base@11.0.10/SelectorImpl.java:136)
- // at
- // org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
- // at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
- // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
- // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
- // at java.io.FilterInputStream.read(java.base@11.0.10/FilterInputStream.java:133)
- // at java.io.BufferedInputStream.fill(java.base@11.0.10/BufferedInputStream.java:252)
- // at java.io.BufferedInputStream.read1(java.base@11.0.10/BufferedInputStream.java:292)
- // at java.io.BufferedInputStream.read(java.base@11.0.10/BufferedInputStream.java:351)
- // - locked <0x00000000f466c028> (a java.io.BufferedInputStream)
- // at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
- // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
- // at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:132)
- // at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:100)
- // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
- // at
- // org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:546)
- // at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:637)
- // at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:505)
- // at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
- // at
- // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.recv_compactionCompleted(CompactionCoordinator.java:118)
- // at
- // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.compactionCompleted(CompactionCoordinator.java:104)
- // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:296)
- // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:291)
- // at
- // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:102)
- // at org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:304)
- // at org.apache.accumulo.compactor.Compactor.run(Compactor.java:509)
-
- // "CompactionCoordinator-ClientPool-Worker-1" #47 daemon prio=5 os_prio=0 cpu=70.37ms
- // elapsed=243.97s tid=0x00005590266cb800 nid=0x4db3 waiting on condition [0x00007f4c8cb6c000]
- // java.lang.Thread.State: TIMED_WAITING (sleeping)
- // at java.lang.Thread.sleep(java.base@11.0.10/Native Method)
- // at org.apache.accumulo.fate.util.UtilWaitThread.sleep(UtilWaitThread.java:33)
- // at
- // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:113)
- // at
- // org.apache.accumulo.coordinator.CompactionCoordinator.compactionCompleted(CompactionCoordinator.java:462)
-
completedThriftCall.run();
} catch (RetriesExceededException e) {
// TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify
@@ -548,13 +525,14 @@ public class CompactionCoordinator extends AbstractServer
*
*
* @param externalCompactionId
- * @return CompactionStats or null if not completed
- * @throws TException
+ * @return CompactionStats
+ * @throws UnknownCompactionIdException
+ * if compaction is not running
*/
public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException {
var ecid = ExternalCompactionId.of(externalCompactionId);
RunningCompaction rc = RUNNING.get(ecid);
- if (null != rc && null != rc.getStats()) {
+ if (null != rc && rc.isCompleted()) {
RUNNING.remove(ecid, rc);
return rc.getStats();
} else if (rc == null) {
@@ -591,6 +569,9 @@ public class CompactionCoordinator extends AbstractServer
if (null != rc) {
rc.addUpdate(timestamp, message, state);
} else {
+ // TODO: If the Coordinator was restarted, we could use these status messages
+ // to re-populate the RUNNING set. This would require the job, compactor address
+ // and TServerInstance
throw new UnknownCompactionIdException();
}
}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
index cc36f50..ea35349 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.compaction.thrift.CompactionState;
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -31,7 +32,8 @@ public class RunningCompaction {
private final TExternalCompactionJob job;
private final String compactorAddress;
private final TServerInstance tserver;
- private Map<Long,CompactionUpdate> updates = new TreeMap<>();
+ private final Map<Long,CompactionUpdate> updates = new TreeMap<>();
+ private final AtomicBoolean completed = new AtomicBoolean(Boolean.FALSE);
private CompactionStats stats = null;
RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) {
@@ -69,4 +71,12 @@ public class RunningCompaction {
return tserver;
}
+ public boolean isCompleted() {
+ return completed.get();
+ }
+
+ public void setCompleted() {
+ completed.compareAndSet(false, true);
+ }
+
}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
index bfda224..fad132b 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -18,6 +18,9 @@
*/
package org.apache.accumulo.compactor;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
@@ -33,14 +36,22 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
-public class CompactionEnvironment implements CompactionEnv {
+public class CompactionEnvironment implements Closeable, CompactionEnv {
private final ServerContext context;
private final CompactionJobHolder jobHolder;
+ private final SharedRateLimiterFactory limiter;
CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
this.context = context;
this.jobHolder = jobHolder;
+ this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration());
+ }
+
+ @Override
+ public void close() throws IOException {
+ limiter.remove("read_rate_limiter");
+ limiter.remove("write_rate_limiter");
}
@Override
@@ -55,14 +66,12 @@ public class CompactionEnvironment implements CompactionEnv {
@Override
public RateLimiter getReadLimiter() {
- return SharedRateLimiterFactory.getInstance(context.getConfiguration())
- .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
+ return limiter.create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
}
@Override
public RateLimiter getWriteLimiter() {
- return SharedRateLimiterFactory.getInstance(context.getConfiguration())
- .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
+ return limiter.create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
}
@Override
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index f1d2454..7922c4f 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -65,7 +65,6 @@ import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.compaction.CompactionInfo;
-import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
import org.apache.accumulo.server.compaction.RetryableThriftCall;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -287,14 +286,14 @@ public class Compactor extends AbstractServer
*/
protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
throws RetriesExceededException {
- RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
- RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
+ RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000,
+ RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() {
@Override
- public Void execute() throws TException {
+ public String execute() throws TException {
try {
coordinatorClient.compareAndSet(null, getCoordinatorClient());
coordinatorClient.get().compactionCompleted(job.getExternalCompactionId(), stats);
- return null;
+ return "";
} catch (TException e) {
ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
throw e;
@@ -379,7 +378,6 @@ public class Compactor extends AbstractServer
final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
- final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder);
final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
job.getFiles().forEach(f -> {
@@ -392,20 +390,22 @@ public class Compactor extends AbstractServer
job.getIteratorSettings().getIterators()
.forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
- org.apache.accumulo.server.compaction.Compactor compactor =
- new org.apache.accumulo.server.compaction.Compactor(getContext(),
- KeyExtent.fromThrift(job.getExtent()), files, outputFile,
- job.isPropagateDeletes(), cenv, iters, tConfig);
-
- LOG.info("Starting compactor");
- started.countDown();
-
- org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
- CompactionStats cs = new CompactionStats();
- cs.setEntriesRead(stat.getEntriesRead());
- cs.setEntriesWritten(stat.getEntriesWritten());
- cs.setFileSize(stat.getFileSize());
- jobHolder.setStats(cs);
+ try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), jobHolder)) {
+ org.apache.accumulo.server.compaction.Compactor compactor =
+ new org.apache.accumulo.server.compaction.Compactor(getContext(),
+ KeyExtent.fromThrift(job.getExtent()), files, outputFile,
+ job.isPropagateDeletes(), cenv, iters, tConfig);
+
+ LOG.info("Starting compactor");
+ started.countDown();
+
+ org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
+ CompactionStats cs = new CompactionStats();
+ cs.setEntriesRead(stat.getEntriesRead());
+ cs.setEntriesWritten(stat.getEntriesWritten());
+ cs.setFileSize(stat.getFileSize());
+ jobHolder.setStats(cs);
+ }
LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId());
// Update state when completed
updateCompactionState(job, CompactionState.SUCCEEDED,
@@ -504,28 +504,40 @@ public class Compactor extends AbstractServer
}
}
}
- try {
- compactionThread.join();
- this.updateCompactionCompleted(job, jobHolder.getStats());
- } catch (InterruptedException e) {
- LOG.error(
- "Compactor thread was interrupted waiting for compaction to finish, cancelling job",
- e);
+ compactionThread.join();
+
+ if (compactionThread.isInterrupted()) {
+ LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
+ try {
+ updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled");
+ // CBUG Might need to call updateCompactionCompleted or the TServer will not
+ // get notified that the compaction was cancelled
+ } catch (RetriesExceededException e) {
+ LOG.error("Error updating coordinator with compaction cancellation.", e);
+ }
+ } else if (err.get() != null) {
try {
- cancel(job.getExternalCompactionId());
- } catch (TException e1) {
- LOG.error("Error cancelling compaction.", e1);
+ updateCompactionState(job, CompactionState.FAILED,
+ "Compaction failed due to: " + err.get().getMessage());
+ // CBUG Might need to call updateCompactionCompleted or the TServer will not
+ // get notified that the compaction failed
+ } catch (RetriesExceededException e) {
+ LOG.error("Error updating coordinator with compaction failure.", e);
}
- } catch (RetriesExceededException e) {
- LOG.error(
- "Error updating coordinator with compaction completion, cancelling compaction.", e);
+ } else {
try {
- cancel(job.getExternalCompactionId());
- } catch (TException e1) {
- LOG.error("Error cancelling compaction.", e1);
+ this.updateCompactionCompleted(job, jobHolder.getStats());
+ } catch (RetriesExceededException e) {
+ LOG.error(
+ "Error updating coordinator with compaction completion, cancelling compaction.",
+ e);
+ try {
+ cancel(job.getExternalCompactionId());
+ } catch (TException e1) {
+ LOG.error("Error cancelling compaction.", e1);
+ }
}
}
-
} catch (InterruptedException e1) {
LOG.error(
"Compactor thread was interrupted waiting for compaction to start, cancelling job",
@@ -537,24 +549,6 @@ public class Compactor extends AbstractServer
}
}
- if (compactionThread.isInterrupted()) {
- LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
- try {
- updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled");
- } catch (RetriesExceededException e) {
- LOG.error("Error updating coordinator with compaction cancellation.", e);
- }
- }
-
- Throwable thrown = err.get();
- if (thrown != null) {
- try {
- updateCompactionState(job, CompactionState.FAILED,
- "Compaction failed due to: " + thrown.getMessage());
- } catch (RetriesExceededException e) {
- LOG.error("Error updating coordinator with compaction failure.", e);
- }
- }
}
} catch (Exception e) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index f640e37..d8cebd1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -1687,8 +1687,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
return extCompaction.toThrift();
}
- // CBUG thrift may not support null return types https://thrift.apache.org/docs/features.html
- return null;
+ return new TExternalCompactionJob();
}
@Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index e2b04d9..3c02127 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -47,6 +47,8 @@ public class ExternalCompactionJob {
private CompactionKind kind;
private List<IteratorSetting> iters;
+ public ExternalCompactionJob() {}
+
public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes,
TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId,
long priority, CompactionKind kind, List<IteratorSetting> iters) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index ecd7083..b9439a7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -771,9 +771,11 @@ public class CompactableImpl implements Compactable {
TabletLogger.compacted(getExtent(), cInfo.job, metaFile);
} catch (Exception e) {
metaFile = null;
+ log.error("Error committing external compaction {}", extCompactionId, e);
throw new RuntimeException(e);
} finally {
completeCompaction(cInfo.job, cInfo.jobFiles, metaFile);
+ log.debug("Completed commit of external compaction{}", extCompactionId);
}
} else {
log.debug("Ignoring request to commit external compaction that is unknown {}",