You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/09 14:46:21 UTC
[accumulo] branch 1451-external-compactions-feature updated: Moved
inner classes out of Compactor and CompactionCoordinator to make them
smaller in overall size
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new c43ebb8 Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size
c43ebb8 is described below
commit c43ebb8ceaf78f76db7d9222b97672b713e123b2
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 9 14:45:44 2021 +0000
Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size
---
.../server/compaction/ExternalCompactionUtil.java | 19 ++
.../coordinator/CompactionCoordinator.java | 187 +-------------
.../accumulo/coordinator/CompactionUpdate.java} | 35 ++-
.../coordinator/CoordinatorLockWatcher.java | 53 ++++
.../accumulo/coordinator/QueueAndPriority.java | 88 +++++++
.../accumulo/coordinator/RunningCompaction.java | 72 ++++++
.../accumulo/compactor/CompactionEnvironment.java | 95 +++++++
.../accumulo/compactor/CompactionJobHolder.java | 77 ++++++
.../org/apache/accumulo/compactor/Compactor.java | 283 ++++++---------------
9 files changed, 517 insertions(+), 392 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 51c743f..e346baa 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -24,6 +24,25 @@ import org.apache.accumulo.server.ServerContext;
public class ExternalCompactionUtil {
+ /**
+ * Utility for returning the address of a service in the form host:port
+ *
+ * @param address
+ * HostAndPort of service
+ * @return host and port
+ */
+ public static String getHostPortString(HostAndPort address) {
+ if (address == null) {
+ return null;
+ }
+ return address.getHost() + ":" + address.getPort();
+ }
+
+ /**
+ *
+ * @param context
+ * @return
+ */
public static HostAndPort findCompactionCoordinator(ServerContext context) {
final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
byte[] address = context.getZooCache().get(lockPath);
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 227714a..db6777f 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -48,17 +47,15 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooLock;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
import org.apache.accumulo.server.compaction.RetryableThriftCall;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
import org.apache.accumulo.server.compaction.RetryableThriftFunction;
@@ -79,178 +76,6 @@ public class CompactionCoordinator extends AbstractServer
implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
LiveTServerSet.Listener {
- private static class QueueAndPriority implements Comparable<QueueAndPriority> {
-
- private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>();
-
- public static QueueAndPriority get(String queue, Long priority) {
- return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
- }
-
- private final String queue;
- private final Long priority;
-
- private QueueAndPriority(String queue, Long priority) {
- super();
- this.queue = queue;
- this.priority = priority;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public Long getPriority() {
- return priority;
- }
-
- @Override
- public int hashCode() {
- return queue.hashCode() + priority.hashCode();
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder();
- buf.append("queue: ").append(queue);
- buf.append(", priority: ").append(priority);
- return buf.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (null == obj)
- return false;
- if (obj == this)
- return true;
- if (!(obj instanceof QueueAndPriority)) {
- return false;
- } else {
- QueueAndPriority other = (QueueAndPriority) obj;
- return this.queue.equals(other.queue) && this.priority.equals(other.priority);
- }
- }
-
- @Override
- public int compareTo(QueueAndPriority other) {
- int result = this.queue.compareTo(other.queue);
- if (result == 0) {
- // reversing order such that if other priority is lower, then this has a higher priority
- return Long.compare(other.priority, this.priority);
- } else {
- return result;
- }
- }
-
- }
-
- private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
-
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
- Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
-
- }
-
- @Override
- public synchronized void acquiredLock() {
- // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
- }
-
- @Override
- public synchronized void failedToAcquireLock(Exception e) {
- // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
- }
-
- }
-
- /**
- * Utility for returning the address in the form host:port
- *
- * @return host and port for Compactor client connections
- */
- private static String getHostPortString(HostAndPort address) {
- if (address == null) {
- return null;
- }
- return address.getHost() + ":" + address.getPort();
- }
-
- private static class CompactionUpdate {
- private final Long timestamp;
- private final String message;
- private final CompactionState state;
-
- public CompactionUpdate(Long timestamp, String message, CompactionState state) {
- super();
- this.timestamp = timestamp;
- this.message = message;
- this.state = state;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public String getMessage() {
- return message;
- }
-
- public CompactionState getState() {
- return state;
- }
- }
-
- private static class RunningCompaction {
- private final TExternalCompactionJob job;
- private final String compactorAddress;
- private final TServerInstance tserver;
- private Map<Long,CompactionUpdate> updates = new TreeMap<>();
- private CompactionStats stats = null;
-
- public RunningCompaction(TExternalCompactionJob job, String compactorAddress,
- TServerInstance tserver) {
- super();
- this.job = job;
- this.compactorAddress = compactorAddress;
- this.tserver = tserver;
- }
-
- public Map<Long,CompactionUpdate> getUpdates() {
- return updates;
- }
-
- public void addUpdate(Long timestamp, String message, CompactionState state) {
- this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
- }
-
- public CompactionStats getStats() {
- return stats;
- }
-
- public void setStats(CompactionStats stats) {
- this.stats = stats;
- }
-
- public TExternalCompactionJob getJob() {
- return job;
- }
-
- public String getCompactorAddress() {
- return compactorAddress;
- }
-
- public TServerInstance getTserver() {
- return tserver;
- }
- }
-
private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
private static final long TIME_BETWEEN_CHECKS = 5000;
@@ -290,11 +115,11 @@ public class CompactionCoordinator extends AbstractServer
* @throws KeeperException
* @throws InterruptedException
*/
- private boolean getCoordinatorLock(HostAndPort clientAddress)
+ protected boolean getCoordinatorLock(HostAndPort clientAddress)
throws KeeperException, InterruptedException {
LOG.info("trying to get coordinator lock");
- final String coordinatorClientAddress = getHostPortString(clientAddress);
+ final String coordinatorClientAddress = ExternalCompactionUtil.getHostPortString(clientAddress);
final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
final UUID zooLockUUID = UUID.randomUUID();
@@ -309,7 +134,7 @@ public class CompactionCoordinator extends AbstractServer
* @return address of this CompactionCoordinator client service
* @throws UnknownHostException
*/
- private ServerAddress startCoordinatorClientService() throws UnknownHostException {
+ protected ServerAddress startCoordinatorClientService() throws UnknownHostException {
CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<
CompactionCoordinator> processor;
@@ -488,7 +313,7 @@ public class CompactionCoordinator extends AbstractServer
}
}
- private TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
+ protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
throws TTransportException {
TServerConnection connection = tserverSet.getConnection(tserver);
TTransport transport =
@@ -496,7 +321,7 @@ public class CompactionCoordinator extends AbstractServer
return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
}
- private Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
+ protected Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
throws TTransportException {
TTransport transport =
ThriftTransportPool.getInstance().getTransport(compactorAddress, 0, getContext());
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
similarity index 57%
copy from server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
copy to server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
index 51c743f..becc428 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
@@ -16,18 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.server.compaction;
+package org.apache.accumulo.coordinator;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
-public class ExternalCompactionUtil {
+public class CompactionUpdate {
- public static HostAndPort findCompactionCoordinator(ServerContext context) {
- final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
- byte[] address = context.getZooCache().get(lockPath);
- String coordinatorAddress = new String(address);
- return HostAndPort.fromString(coordinatorAddress);
+ private final Long timestamp;
+ private final String message;
+ private final CompactionState state;
+
+ CompactionUpdate(Long timestamp, String message, CompactionState state) {
+ super();
+ this.timestamp = timestamp;
+ this.message = message;
+ this.state = state;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public CompactionState getState() {
+ return state;
}
+
}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
new file mode 100644
index 0000000..30ebb2e
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class);
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Exception e) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
+ Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
+
+ }
+
+ @Override
+ public synchronized void acquiredLock() {
+ // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+ }
+
+ @Override
+ public synchronized void failedToAcquireLock(Exception e) {
+ // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+ }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
new file mode 100644
index 0000000..641fa97
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.WeakHashMap;
+
+import org.apache.accumulo.core.util.Pair;
+
+public class QueueAndPriority implements Comparable<QueueAndPriority> {
+
+ private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>();
+
+ public static QueueAndPriority get(String queue, Long priority) {
+ return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
+ }
+
+ private final String queue;
+ private final Long priority;
+
+ private QueueAndPriority(String queue, Long priority) {
+ super();
+ this.queue = queue;
+ this.priority = priority;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public Long getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int hashCode() {
+ return queue.hashCode() + priority.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("queue: ").append(queue);
+ buf.append(", priority: ").append(priority);
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (null == obj)
+ return false;
+ if (obj == this)
+ return true;
+ if (!(obj instanceof QueueAndPriority)) {
+ return false;
+ } else {
+ QueueAndPriority other = (QueueAndPriority) obj;
+ return this.queue.equals(other.queue) && this.priority.equals(other.priority);
+ }
+ }
+
+ @Override
+ public int compareTo(QueueAndPriority other) {
+ int result = this.queue.compareTo(other.queue);
+ if (result == 0) {
+ // reversing order such that if other priority is lower, then this has a higher priority
+ return Long.compare(other.priority, this.priority);
+ } else {
+ return result;
+ }
+ }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
new file mode 100644
index 0000000..cc36f50
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+
+public class RunningCompaction {
+
+ private final TExternalCompactionJob job;
+ private final String compactorAddress;
+ private final TServerInstance tserver;
+ private Map<Long,CompactionUpdate> updates = new TreeMap<>();
+ private CompactionStats stats = null;
+
+ RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) {
+ super();
+ this.job = job;
+ this.compactorAddress = compactorAddress;
+ this.tserver = tserver;
+ }
+
+ public Map<Long,CompactionUpdate> getUpdates() {
+ return updates;
+ }
+
+ public void addUpdate(Long timestamp, String message, CompactionState state) {
+ this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
+ }
+
+ public CompactionStats getStats() {
+ return stats;
+ }
+
+ public void setStats(CompactionStats stats) {
+ this.stats = stats;
+ }
+
+ public TExternalCompactionJob getJob() {
+ return job;
+ }
+
+ public String getCompactorAddress() {
+ return compactorAddress;
+ }
+
+ public TServerInstance getTserver() {
+ return tserver;
+ }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
new file mode 100644
index 0000000..bfda224
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
+import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+
+public class CompactionEnvironment implements CompactionEnv {
+
+ private final ServerContext context;
+ private final CompactionJobHolder jobHolder;
+
+ CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
+ this.context = context;
+ this.jobHolder = jobHolder;
+ }
+
+ @Override
+ public boolean isCompactionEnabled() {
+ return !jobHolder.isCancelled();
+ }
+
+ @Override
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.majc;
+ }
+
+ @Override
+ public RateLimiter getReadLimiter() {
+ return SharedRateLimiterFactory.getInstance(context.getConfiguration())
+ .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
+ }
+
+ @Override
+ public RateLimiter getWriteLimiter() {
+ return SharedRateLimiterFactory.getInstance(context.getConfiguration())
+ .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
+ }
+
+ @Override
+ public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
+ AccumuloConfiguration acuTableConf, TableId tableId) {
+ return new TabletIteratorEnvironment(context, IteratorScope.majc,
+ !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
+ CompactionKind.valueOf(jobHolder.getJob().getKind().name()));
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> getMinCIterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompactionReason getReason() {
+ switch (jobHolder.getJob().getKind()) {
+ case USER:
+ return CompactionReason.USER;
+ case CHOP:
+ return CompactionReason.CHOP;
+ case SELECTOR:
+ case SYSTEM:
+ default:
+ return CompactionReason.SYSTEM;
+ }
+ }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
new file mode 100644
index 0000000..f6ab00c
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+
+public class CompactionJobHolder {
+
+ private TExternalCompactionJob job;
+ private Thread compactionThread;
+ private volatile Boolean cancelled = Boolean.FALSE;
+ private CompactionStats stats = null;
+
+ CompactionJobHolder() {}
+
+ public void reset() {
+ job = null;
+ compactionThread = null;
+ cancelled = Boolean.FALSE;
+ stats = null;
+ }
+
+ public TExternalCompactionJob getJob() {
+ return job;
+ }
+
+ public Thread getThread() {
+ return compactionThread;
+ }
+
+ public CompactionStats getStats() {
+ return stats;
+ }
+
+ public void setStats(CompactionStats stats) {
+ this.stats = stats;
+ }
+
+ public void cancel() {
+ cancelled = Boolean.TRUE;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public boolean isSet() {
+ return (null != this.job);
+ }
+
+ public void set(TExternalCompactionJob job, Thread compactionThread) {
+ Objects.requireNonNull(job, "CompactionJob is null");
+ Objects.requireNonNull(compactionThread, "Compaction thread is null");
+ this.job = job;
+ this.compactionThread = compactionThread;
+ }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index d2e94e3..992c383 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -26,7 +26,6 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -41,26 +40,18 @@ import org.apache.accumulo.core.compaction.thrift.CompactionState;
import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.spi.compaction.CompactionKind;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.fate.util.UtilWaitThread;
@@ -80,8 +71,6 @@ import org.apache.accumulo.server.compaction.RetryableThriftCall;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
import org.apache.accumulo.server.compaction.RetryableThriftFunction;
import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
-import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
@@ -107,73 +96,6 @@ public class Compactor extends AbstractServer
}
}
- /**
- * Object used to hold information about the current compaction
- */
- private static class CompactionJobHolder {
- private TExternalCompactionJob job;
- private Thread compactionThread;
- private volatile Boolean cancelled = Boolean.FALSE;
- private CompactionStats stats = null;
-
- public CompactionJobHolder() {}
-
- public void reset() {
- job = null;
- compactionThread = null;
- cancelled = Boolean.FALSE;
- stats = null;
- }
-
- public TExternalCompactionJob getJob() {
- return job;
- }
-
- public Thread getThread() {
- return compactionThread;
- }
-
- public CompactionStats getStats() {
- return stats;
- }
-
- public void setStats(CompactionStats stats) {
- this.stats = stats;
- }
-
- public void cancel() {
- cancelled = Boolean.TRUE;
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
- public boolean isSet() {
- return (null != this.job);
- }
-
- public void set(TExternalCompactionJob job, Thread compactionThread) {
- Objects.requireNonNull(job, "CompactionJob is null");
- Objects.requireNonNull(compactionThread, "Compaction thread is null");
- this.job = job;
- this.compactionThread = compactionThread;
- }
-
- }
-
- /**
- * Utility for returning the address in the form host:port
- *
- * @return host and port for Compactor client connections
- */
- private static String getHostPortString(HostAndPort address) {
- if (address == null) {
- return null;
- }
- return address.getHost() + ":" + address.getPort();
- }
-
public static final String COMPACTOR_SERVICE = "COMPACTOR_SVC";
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
@@ -186,6 +108,7 @@ public class Compactor extends AbstractServer
private final String queueName;
private final AtomicReference<CompactionCoordinator.Client> coordinatorClient =
new AtomicReference<>();
+
private ZooLock compactorLock;
private ServerAddress compactorAddress = null;
@@ -212,10 +135,10 @@ public class Compactor extends AbstractServer
* @throws KeeperException
* @throws InterruptedException
*/
- private void announceExistence(HostAndPort clientAddress)
+ protected void announceExistence(HostAndPort clientAddress)
throws KeeperException, InterruptedException {
- String hostPort = getHostPortString(clientAddress);
+ String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress);
ZooReaderWriter zoo = getContext().getZooReaderWriter();
String compactorQueuePath =
@@ -276,7 +199,7 @@ public class Compactor extends AbstractServer
* @return address of this compactor client service
* @throws UnknownHostException
*/
- private ServerAddress startCompactorClientService() throws UnknownHostException {
+ protected ServerAddress startCompactorClientService() throws UnknownHostException {
Compactor rpcProxy = TraceUtil.wrapService(this);
final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor;
if (getContext().getThriftServerType() == ThriftServerType.SASL) {
@@ -298,11 +221,12 @@ public class Compactor extends AbstractServer
}
/**
- * Called by a thrift client to cancel the currently running compaction if it matches the supplied
- * job
+ * Called by a CompactionCoordinator to cancel the currently running compaction
*
- * @param compactionJob
- * job
+ * @param externalCompactionId
+ * compaction id
+ * @throws UnknownCompactionIdException
+ * if the externalCompactionId does not match the currently executing compaction
*/
@Override
public void cancel(String externalCompactionId) throws TException {
@@ -319,18 +243,17 @@ public class Compactor extends AbstractServer
}
/**
- * Send an update to the coordinator for this job
+ * Send an update to the CompactionCoordinator for this job
*
- * @param coordinatorClient
- * address of the CompactionCoordinator
* @param job
* compactionJob
* @param state
* updated state
* @param message
* updated message
+ * @throws RetriesExceededException
*/
- private void updateCompactionState(TExternalCompactionJob job, CompactionState state,
+ protected void updateCompactionState(TExternalCompactionJob job, CompactionState state,
String message) throws RetriesExceededException {
RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
@@ -351,7 +274,7 @@ public class Compactor extends AbstractServer
}
/**
- * Update the coordinator with the stats from the job
+ * Update the CompactionCoordinator with the stats from the completed job
*
* @param job
* current compaction job
@@ -359,7 +282,7 @@ public class Compactor extends AbstractServer
* compaction stats
* @throws RetriesExceededException
*/
- private void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
+ protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
throws RetriesExceededException {
RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
@@ -388,7 +311,7 @@ public class Compactor extends AbstractServer
* @return CompactionJob
* @throws RetriesExceededException
*/
- private TExternalCompactionJob getNextJob() throws RetriesExceededException {
+ protected TExternalCompactionJob getNextJob() throws RetriesExceededException {
RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0,
new RetryableThriftFunction<TExternalCompactionJob>() {
@@ -397,7 +320,7 @@ public class Compactor extends AbstractServer
try {
coordinatorClient.compareAndSet(null, getCoordinatorClient());
return coordinatorClient.get().getCompactionJob(queueName,
- getHostPortString(compactorAddress.getAddress()));
+ ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()));
} catch (TException e) {
ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
throw e;
@@ -414,7 +337,7 @@ public class Compactor extends AbstractServer
* @throws TTransportException
* when unable to get client
*/
- private CompactionCoordinator.Client getCoordinatorClient() throws TTransportException {
+ protected CompactionCoordinator.Client getCoordinatorClient() throws TTransportException {
HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext());
if (null == coordinatorHost) {
throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
@@ -425,60 +348,72 @@ public class Compactor extends AbstractServer
}
/**
- * Create and return a new CompactionEnv for the current compaction job
- *
+ * Create compaction runnable
+ *
* @param job
- * current compaction job
- * @return new env
+ * compaction job
+ * @param totalInputEntries
+ * object to capture total entries
+ * @param started
+ * started latch
+ * @param stopped
+ * stopped latch
+ * @param err
+ * reference to error
+ * @return Runnable compaction job
*/
- private CompactionEnv getCompactionEnvironment(TExternalCompactionJob job) {
- return new CompactionEnv() {
- @Override
- public boolean isCompactionEnabled() {
- return !jobHolder.isCancelled();
- }
+ protected Runnable createCompactionJob(final TExternalCompactionJob job,
+ final LongAdder totalInputEntries, final CountDownLatch started, final CountDownLatch stopped,
+ final AtomicReference<Throwable> err) {
+ return new Runnable() {
@Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.majc;
- }
-
- @Override
- public RateLimiter getReadLimiter() {
- return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
- .create("read_rate_limiter", () -> job.getReadRate());
- }
-
- @Override
- public RateLimiter getWriteLimiter() {
- return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
- .create("write_rate_limiter", () -> job.getWriteRate());
- }
-
- @Override
- public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
- AccumuloConfiguration acuTableConf, TableId tableId) {
- return new TabletIteratorEnvironment(getContext(), IteratorScope.majc,
- !job.isPropagateDeletes(), acuTableConf, tableId,
- CompactionKind.valueOf(job.getKind().name()));
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> getMinCIterator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CompactionReason getReason() {
- switch (job.getKind()) {
- case USER:
- return CompactionReason.USER;
- case CHOP:
- return CompactionReason.CHOP;
- case SELECTOR:
- case SYSTEM:
- default:
- return CompactionReason.SYSTEM;
+ public void run() {
+ try {
+ LOG.info("Starting up compaction runnable for job: {}", job);
+ updateCompactionState(job, CompactionState.STARTED, "Compaction started");
+
+ final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
+ final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
+ final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
+ final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder);
+
+ final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
+ job.getFiles().forEach(f -> {
+ files.put(new StoredTabletFile(f.getMetadataFileEntry()),
+ new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp()));
+ totalInputEntries.add(f.getEntries());
+ });
+
+ final List<IteratorSetting> iters = new ArrayList<>();
+ job.getIteratorSettings().getIterators()
+ .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
+
+ org.apache.accumulo.server.compaction.Compactor compactor =
+ new org.apache.accumulo.server.compaction.Compactor(getContext(),
+ KeyExtent.fromThrift(job.getExtent()), files, outputFile,
+ job.isPropagateDeletes(), cenv, iters, tConfig);
+
+ LOG.info("Starting compactor");
+ started.countDown();
+
+ org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
+ CompactionStats cs = new CompactionStats();
+ cs.setEntriesRead(stat.getEntriesRead());
+ cs.setEntriesWritten(stat.getEntriesWritten());
+ cs.setFileSize(stat.getFileSize());
+ jobHolder.setStats(cs);
+ LOG.info("Compaction completed successfully");
+ // Update state when completed
+ updateCompactionState(job, CompactionState.SUCCEEDED,
+ "Compaction completed successfully");
+ } catch (Exception e) {
+ LOG.error("Compaction failed", e);
+ err.set(e);
+ throw new RuntimeException("Compaction failed", e);
+ } finally {
+ stopped.countDown();
+ // TODO: Any cleanup
}
}
};
@@ -518,67 +453,13 @@ public class Compactor extends AbstractServer
}
LOG.info("Received next compaction job: {}", job);
- final LongAdder totalInputSize = new LongAdder();
final LongAdder totalInputEntries = new LongAdder();
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch stopped = new CountDownLatch(1);
- Thread compactionThread = Threads.createThread(
- "Compaction job for tablet " + job.getExtent().toString(), new Runnable() {
-
- @Override
- public void run() {
- try {
- LOG.info("Setting up to run compactor");
- updateCompactionState(job, CompactionState.STARTED, "Compaction started");
-
- final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
- final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
-
- final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
- job.getFiles().forEach(f -> {
- files.put(new StoredTabletFile(f.getMetadataFileEntry()),
- new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp()));
- totalInputSize.add(f.getSize());
- totalInputEntries.add(f.getEntries());
- });
-
- final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
-
- final CompactionEnv cenv = getCompactionEnvironment(job);
-
- final List<IteratorSetting> iters = new ArrayList<>();
- job.getIteratorSettings().getIterators()
- .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
-
- org.apache.accumulo.server.compaction.Compactor compactor =
- new org.apache.accumulo.server.compaction.Compactor(getContext(),
- KeyExtent.fromThrift(job.getExtent()), files, outputFile,
- job.isPropagateDeletes(), cenv, iters, tConfig);
-
- LOG.info("Starting compactor");
- started.countDown();
-
- org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
- CompactionStats cs = new CompactionStats();
- cs.setEntriesRead(stat.getEntriesRead());
- cs.setEntriesWritten(stat.getEntriesWritten());
- cs.setFileSize(stat.getFileSize());
- jobHolder.setStats(cs);
- LOG.info("Compaction completed successfully");
- // Update state when completed
- updateCompactionState(job, CompactionState.SUCCEEDED,
- "Compaction completed successfully");
- } catch (Exception e) {
- LOG.error("Compaction failed", e);
- err.set(e);
- throw new RuntimeException("Compaction failed", e);
- } finally {
- stopped.countDown();
- // TODO: Any cleanup
- }
- }
- });
+ Thread compactionThread =
+ Threads.createThread("Compaction job for tablet " + job.getExtent().toString(),
+ this.createCompactionJob(job, totalInputEntries, started, stopped, err));
synchronized (jobHolder) {
jobHolder.set(job, compactionThread);