You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/02 22:16:31 UTC
[accumulo] branch 1451-external-compactions-feature updated: re
#1451: wip on coordinator
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new 2357912 re #1451: wip on coordinator
2357912 is described below
commit 235791246bf9c888898f6a15ddd24f94197144c9
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 2 22:16:04 2021 +0000
re #1451: wip on coordinator
---
.../java/org/apache/accumulo/core/Constants.java | 3 +
.../org/apache/accumulo/core/conf/Property.java | 16 +-
pom.xml | 1 +
.../accumulo/server/manager/LiveTServerSet.java | 4 +
server/compaction-coordinator/.gitignore | 28 ++
server/compaction-coordinator/pom.xml | 34 ++
.../coordinator/CompactionCoordinator.java | 518 +++++++++++++++++++++
7 files changed, 603 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 96b8b43..70353f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -59,6 +59,9 @@ public class Constants {
public static final String ZTSERVERS = "/tservers";
+ public static final String ZCOORDINATOR = "/coordinators";
+ public static final String ZCOORDINATOR_LOCK = "/coordinators/lock";
+
public static final String ZDEAD = "/dead";
public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 04015e3..f05794f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1024,7 +1024,21 @@ public enum Property {
REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION,
"Amount of time for a single replication RPC call to last before failing"
+ " the attempt. See replication.work.attempts."),
- // deprecated properties grouped at the end to reference property that replaces them
+ // CompactionCoordinator properties
+ COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX,
+ "Properties in this category affect the behavior of the accumulo compaction coordinator server."),
+ COORDINATOR_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN,
+ "if the ports above are in use, search higher ports until one is available"),
+ COORDINATOR_CLIENTPORT("coordinator.port.client", "9101", PropertyType.PORT,
+ "The port used for handling client connections on the compactor servers"),
+ COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT,
+ "The minimum number of threads to use to handle incoming requests."),
+ COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
+ "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."),
+ COORDINATOR_THREADCHECK("coordinator.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
+ "The time between adjustments of the server thread pool."),
+ COORDINATOR_MAX_MESSAGE_SIZE("coordinator.server.message.size.max", "10M", PropertyType.BYTES,
+ "The maximum size of a message that can be sent to a tablet server."), // deprecated properties grouped at the end to reference property that replaces them
@Deprecated(since = "1.6.0")
@ReplacedBy(property = INSTANCE_VOLUMES)
INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI,
diff --git a/pom.xml b/pom.xml
index 0eb0c5c..d2bd521 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
<module>iterator-test-harness</module>
<module>minicluster</module>
<module>server/base</module>
+ <module>server/compaction-coordinator</module>
<module>server/gc</module>
<module>server/manager</module>
<module>server/master</module>
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 8e85250..b318835 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -77,6 +77,10 @@ public class LiveTServerSet implements Watcher {
public TServerConnection(HostAndPort addr) {
address = addr;
}
+
+ public HostAndPort getAddress() {
+ return address;
+ }
private String lockString(ZooLock mlock) {
return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK);
diff --git a/server/compaction-coordinator/.gitignore b/server/compaction-coordinator/.gitignore
new file mode 100644
index 0000000..e77a822
--- /dev/null
+++ b/server/compaction-coordinator/.gitignore
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Maven ignores
+/target/
+
+# IDE ignores
+/.settings/
+/.project
+/.classpath
+/.pydevproject
+/.idea
+/*.iml
+/nbproject/
+/nbactions.xml
+/nb-configuration.xml
diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml
new file mode 100644
index 0000000..9f25e5d
--- /dev/null
+++ b/server/compaction-coordinator/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-project</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <artifactId>accumulo-compaction-coordinator</artifactId>
+ <name>Apache Accumulo Compaction Coordinator</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-server-base</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
new file mode 100644
index 0000000..73670c0
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.compaction.thrift.Status;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.thrift.CompactionStats;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompactionCoordinator extends AbstractServer implements
+ org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener {
+
+ private static class QueueAndPriority implements Comparable<QueueAndPriority> {
+
+ private static WeakHashMap<Pair<String,Long>, QueueAndPriority> CACHE = new WeakHashMap<>();
+
+ public static QueueAndPriority get(String queue, Long priority) {
+ return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
+ }
+
+ private final String queue;
+ private final Long priority;
+
+ private QueueAndPriority(String queue, Long priority) {
+ super();
+ this.queue = queue;
+ this.priority = priority;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public Long getPriority() {
+ return priority;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return queue.hashCode() + priority.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("queue: ").append(queue);
+ buf.append(", priority: ").append(priority);
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (null == obj)
+ return false;
+ if (obj == this)
+ return true;
+ if (!(obj instanceof QueueAndPriority)) {
+ return false;
+ } else {
+ QueueAndPriority other = (QueueAndPriority) obj;
+ return this.queue.equals(other.queue) && this.priority.equals(other.priority);
+ }
+ }
+
+ @Override
+ public int compareTo(QueueAndPriority other) {
+ int result = this.queue.compareTo(other.queue);
+ if (result == 0) {
+ // reversing order such that if other priority is lower, then this has a higher priority
+ return Long.compare(other.priority, this.priority);
+ } else {
+ return result;
+ }
+ }
+
+ }
+
+ private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Exception e) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
+ Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
+
+ }
+
+ @Override
+ public synchronized void acquiredLock() {
+ // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+ }
+
+ @Override
+ public synchronized void failedToAcquireLock(Exception e) {
+ // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+ }
+
+ }
+
+ /**
+ * Utility for returning the address in the form host:port
+ *
+ * @return host and port for Compactor client connections
+ */
+ private static String getHostPortString(HostAndPort address) {
+ if (address == null) {
+ return null;
+ }
+ return address.getHost() + ":" + address.getPort();
+ }
+
+ private static class CompactionUpdate {
+ private final Long timestamp;
+ private final String message;
+ private final CompactionState state;
+ public CompactionUpdate(Long timestamp, String message, CompactionState state) {
+ super();
+ this.timestamp = timestamp;
+ this.message = message;
+ this.state = state;
+ }
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ public String getMessage() {
+ return message;
+ }
+ public CompactionState getState() {
+ return state;
+ }
+ }
+
+ private static class RunningCompaction {
+ private final CompactionJob job;
+ private final String compactor;
+ private final TServerInstance tserver;
+ private Map<Long, CompactionUpdate> updates = new TreeMap<>();
+ private CompactionStats stats = null;
+ public RunningCompaction(CompactionJob job, String compactor, TServerInstance tserver) {
+ super();
+ this.job = job;
+ this.compactor = compactor;
+ this.tserver = tserver;
+ }
+ public Map<Long,CompactionUpdate> getUpdates() {
+ return updates;
+ }
+ public void addUpdate(Long timestamp, String message, CompactionState state) {
+ this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
+ }
+ public void setUpdates(Map<Long,CompactionUpdate> updates) {
+ this.updates = updates;
+ }
+ public CompactionStats getStats() {
+ return stats;
+ }
+ public void setStats(CompactionStats stats) {
+ this.stats = stats;
+ }
+ public CompactionJob getJob() {
+ return job;
+ }
+ public String getCompactor() {
+ return compactor;
+ }
+ public TServerInstance getTserver() {
+ return tserver;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
+ private static final long TIME_BETWEEN_CHECKS = 5000;
+
+ /* Map of external queue name -> priority -> tservers */
+ private static final Map<String, TreeMap<Long, LinkedHashSet<TServerInstance>>> QUEUES = new HashMap<>();
+ /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */
+ private static final Map<TServerInstance, HashSet<QueueAndPriority>> INDEX = new HashMap<>();
+ /* Map of compactionId to RunningCompactions */
+ private static final Map<Long, RunningCompaction> RUNNING = new ConcurrentHashMap<>();
+
+ private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+ private final AccumuloConfiguration aconf;
+
+ private ZooLock coordinatorLock;
+ private LiveTServerSet tserverSet;
+
+ protected CompactionCoordinator(ServerOpts opts, String[] args) {
+ super("compaction-coordinator", opts, args);
+ ServerContext context = super.getContext();
+ context.setupCrypto();
+
+ aconf = getConfiguration();
+ ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
+ () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_CHECKS,
+ TimeUnit.MILLISECONDS);
+ LOG.info("Version " + Constants.VERSION);
+ LOG.info("Instance " + context.getInstanceID());
+ }
+
+ /**
+ * Set up nodes and locks in ZooKeeper for this CompactionCoordinator
+ *
+ * @param clientAddress
+ * address of this Compactor
+ * @return true if lock was acquired, else false
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private boolean getCoordinatorLock(HostAndPort clientAddress)
+ throws KeeperException, InterruptedException {
+ LOG.info("trying to get coordinator lock");
+
+ final String coordinatorClientAddress = getHostPortString(clientAddress);
+ final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
+ final UUID zooLockUUID = UUID.randomUUID();
+
+ CoordinatorLockWatcher managerLockWatcher = new CoordinatorLockWatcher();
+ coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID);
+ return coordinatorLock.tryLock(managerLockWatcher, coordinatorClientAddress.getBytes());
+ }
+
+ /**
+ * Start this CompactionCoordinator thrift service to handle incoming client requests
+ *
+ * @return address of this CompactionCoordinator client service
+ * @throws UnknownHostException
+ */
+ private ServerAddress startCompactorClientService() throws UnknownHostException {
+ CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
+ final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<CompactionCoordinator> processor;
+ if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+ CompactionCoordinator tcredProxy =
+ TCredentialsUpdatingWrapper.service(rpcProxy, CompactionCoordinator.class, getConfiguration());
+ processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(tcredProxy);
+ } else {
+ processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(rpcProxy);
+ }
+ Property maxMessageSizeProperty = (aconf.get(Property.COORDINATOR_MAX_MESSAGE_SIZE) != null
+ ? Property.COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(),
+ Property.COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
+ "Thrift Client Server", Property.COORDINATOR_PORTSEARCH, Property.COORDINATOR_MINTHREADS,
+ Property.COORDINATOR_MINTHREADS_TIMEOUT, Property.COORDINATOR_THREADCHECK,
+ maxMessageSizeProperty);
+ LOG.info("address = {}", sp.address);
+ return sp;
+ }
+
+ @Override
+ public void run() {
+
+ ServerAddress coordinatorAddress = null;
+ try {
+ coordinatorAddress = startCompactorClientService();
+ } catch (UnknownHostException e1) {
+ throw new RuntimeException("Failed to start the coordinator service", e1);
+ }
+ final HostAndPort clientAddress = coordinatorAddress.address;
+
+ try {
+ if (!getCoordinatorLock(clientAddress)) {
+ throw new RuntimeException("Unable to get Coordinator lock.");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception getting Coordinator lock", e);
+ }
+
+ tserverSet = new LiveTServerSet(getContext(), this);
+
+ // TODO: On initial startup contact all running tservers to get information about the compactions
+ // that are current running in external queues to populate the RUNNING map. This is to handle
+ // the case where the coordinator dies or is restarted at runtime
+
+ tserverSet.startListeningForTabletServerChanges();
+
+ while (true) {
+ tserverSet.getCurrentServers().forEach(tsi -> {
+ try {
+ synchronized(QUEUES) {
+ TabletClientService.Client client = getTabletServerConnection(tsi);
+ List<CompactionQueueSummary> summaries = client.getCompactionQueueInfo();
+ summaries.forEach(summary -> {
+ QueueAndPriority qp = QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority());
+ QUEUES.putIfAbsent(qp.getQueue(), new TreeMap<>()).putIfAbsent(qp.getPriority(), new LinkedHashSet<>()).add(tsi);
+ INDEX.putIfAbsent(tsi, new HashSet<>()).add(qp);
+ });
+ }
+ } catch (TException e) {
+ LOG.warn("Error getting compaction summaries from tablet server: {}", tsi.getHostAndPort(), e);
+ }
+ });
+ UtilWaitThread.sleep(60000);
+ }
+
+ }
+
+
+ /**
+ * Callback for the LiveTServerSet object to update us current set of tablet servers, including
+ * ones that were deleted and added
+ *
+ * @param current current set of live tservers
+ * @param deleted set of tservers that were removed from current since last update
+ * @param added set of tservers that were added to current since last update
+ */
+ @Override
+ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+
+ // run() will iterate over the current and added tservers and add them to the internal
+ // data structures. For tservers that are deleted, we need to remove them from the
+ // internal data structures
+ synchronized (QUEUES) {
+ deleted.forEach(tsi -> {
+ INDEX.get(tsi).forEach(qp -> {
+ TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue());
+ if (null != m) {
+ LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
+ if (null != tservers) {
+ tservers.remove(tsi);
+ }
+ }
+ });
+ INDEX.remove(tsi);
+ });
+ }
+ }
+
+ /**
+ * Return the next compaction job for the queue to a Compactor
+ *
+ * @param queueName queue
+ * @param compactor compactor address
+ * @return compaction job
+ */
+ @Override
+ public CompactionJob getCompactionJob(String queueName, String compactor) throws TException {
+ String queue = queueName.intern();
+ TServerInstance tserver = null;
+ Long priority = null;
+ synchronized(QUEUES) {
+ TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(queueName.intern());
+ if (null != m) {
+ while (tserver == null) {
+ // Get the first TServerInstance from the highest priority queue
+ Entry<Long, LinkedHashSet<TServerInstance>> entry = m.firstEntry();
+ priority = entry.getKey();
+ LinkedHashSet<TServerInstance> tservers = entry.getValue();
+ if (null == tservers || m.isEmpty()) {
+ // Clean up the map entry when no tservers for this queue and priority
+ m.remove(entry.getKey(), entry.getValue());
+ continue;
+ } else {
+ tserver = tservers.iterator().next();
+ // Remove the tserver from the list, we are going to run a compaction on this server
+ tservers.remove(tserver);
+ if (tservers.size() == 0) {
+ // Clean up the map entry when no tservers remaining for this queue and priority
+ m.remove(entry.getKey(), entry.getValue());
+ }
+ HashSet<QueueAndPriority> qp = INDEX.get(tserver);
+ qp.remove(QueueAndPriority.get(queue, priority));
+ if (qp.size() == 0) {
+ // Remove the tserver from the index
+ INDEX.remove(tserver);
+ }
+ break;
+ }
+ }
+ } else {
+ return (CompactionJob) null; //TODO: or should we thrown an error for no tserver for this queue?
+ }
+ }
+
+ try {
+ TabletClientService.Client client = getTabletServerConnection(tserver);
+ CompactionJob job = client.reserveCompactionJob(queue, priority, compactor);
+ RUNNING.put(job.getCompactionId(), new RunningCompaction(job, compactor, tserver));
+ return job;
+ } catch (TException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private TabletClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException {
+ TServerConnection connection = tserverSet.getConnection(tserver);
+ TTransport transport = ThriftTransportPool.getInstance().getTransport(connection.getAddress(), 0, getContext());
+ return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
+ }
+
+
+ @Override
+ public void cancelCompaction(TKeyExtent extent, String queueName, long priority)
+ throws TException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public List<Status> getCompactionStatus(TKeyExtent extent, String queueName, long priority)
+ throws TException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Compactor calls compactionCompleted passing in the CompactionStats
+ *
+ * @param job compaction job
+ * @param stats compaction stats
+ */
+ @Override
+ public void compactionCompleted(CompactionJob job, CompactionStats stats) throws TException {
+ RunningCompaction rc = RUNNING.get(job.getCompactionId());
+ if (null != rc) {
+ rc.setStats(stats);
+ }
+ // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify
+ // the tserver that the compaction has finished and instead let the tserver that is hosting the
+ // tablet poll for state updates. That way if the tablet is re-hosted, the tserver can check as
+ // part of the tablet loading process. This would also enable us to remove the running compaction
+ // from RUNNING when the tserver makes the call and gets the stats.
+ TabletClientService.Client client = getTabletServerConnection(rc.getTserver());
+ client.compactionJobFinished(rc.getJob());
+ }
+
+ /**
+ * Compactor calls to update the status of the assigned compaction
+ *
+ * @param job compaction job
+ * @param state compaction state
+ * @param message informational message
+ * @param timestamp timestamp of the message
+ */
+ @Override
+ public void updateCompactionStatus(CompactionJob job, CompactionState state,
+ String message, long timestamp) throws TException {
+ RunningCompaction rc = RUNNING.get(job.getCompactionId());
+ if (null != rc) {
+ rc.addUpdate(timestamp, message, state);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) {
+ compactor.runServer();
+ }
+ }
+
+}