You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/02 22:16:31 UTC

[accumulo] branch 1451-external-compactions-feature updated: re #1451: wip on coordinator

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 2357912  re #1451: wip on coordinator
2357912 is described below

commit 235791246bf9c888898f6a15ddd24f94197144c9
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 2 22:16:04 2021 +0000

    re #1451: wip on coordinator
---
 .../java/org/apache/accumulo/core/Constants.java   |   3 +
 .../org/apache/accumulo/core/conf/Property.java    |  16 +-
 pom.xml                                            |   1 +
 .../accumulo/server/manager/LiveTServerSet.java    |   4 +
 server/compaction-coordinator/.gitignore           |  28 ++
 server/compaction-coordinator/pom.xml              |  34 ++
 .../coordinator/CompactionCoordinator.java         | 518 +++++++++++++++++++++
 7 files changed, 603 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 96b8b43..70353f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -59,6 +59,9 @@ public class Constants {
 
   public static final String ZTSERVERS = "/tservers";
 
+  public static final String ZCOORDINATOR = "/coordinators";
+  public static final String ZCOORDINATOR_LOCK = "/coordinators/lock";
+  
   public static final String ZDEAD = "/dead";
   public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
 
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 04015e3..f05794f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1024,7 +1024,21 @@ public enum Property {
   REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION,
       "Amount of time for a single replication RPC call to last before failing"
           + " the attempt. See replication.work.attempts."),
-  // deprecated properties grouped at the end to reference property that replaces them
+  // CompactionCoordinator properties
+  COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX,
+      "Properties in this category affect the behavior of the accumulo compaction coordinator server."),
+  COORDINATOR_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN,
+      "if the ports above are in use, search higher ports until one is available"),
+  COORDINATOR_CLIENTPORT("coordinator.port.client", "9101", PropertyType.PORT,
+      "The port used for handling client connections on the compactor servers"),
+  COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT,
+      "The minimum number of threads to use to handle incoming requests."),
+  COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
+      "The time after which incoming request threads terminate with no work available.  Zero (0) will keep the threads alive indefinitely."),
+  COORDINATOR_THREADCHECK("coordinator.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
+      "The time between adjustments of the server thread pool."),
+  COORDINATOR_MAX_MESSAGE_SIZE("coordinator.server.message.size.max", "10M", PropertyType.BYTES,
+      "The maximum size of a message that can be sent to a tablet server."),  // deprecated properties grouped at the end to reference property that replaces them
   @Deprecated(since = "1.6.0")
   @ReplacedBy(property = INSTANCE_VOLUMES)
   INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI,
diff --git a/pom.xml b/pom.xml
index 0eb0c5c..d2bd521 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
     <module>iterator-test-harness</module>
     <module>minicluster</module>
     <module>server/base</module>
+    <module>server/compaction-coordinator</module>
     <module>server/gc</module>
     <module>server/manager</module>
     <module>server/master</module>
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 8e85250..b318835 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -77,6 +77,10 @@ public class LiveTServerSet implements Watcher {
     public TServerConnection(HostAndPort addr) {
       address = addr;
     }
+    
+    public HostAndPort getAddress() {
+      return address;
+    }
 
     private String lockString(ZooLock mlock) {
       return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK);
diff --git a/server/compaction-coordinator/.gitignore b/server/compaction-coordinator/.gitignore
new file mode 100644
index 0000000..e77a822
--- /dev/null
+++ b/server/compaction-coordinator/.gitignore
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Maven ignores
+/target/
+
+# IDE ignores
+/.settings/
+/.project
+/.classpath
+/.pydevproject
+/.idea
+/*.iml
+/nbproject/
+/nbactions.xml
+/nb-configuration.xml
diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml
new file mode 100644
index 0000000..9f25e5d
--- /dev/null
+++ b/server/compaction-coordinator/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.accumulo</groupId>
+    <artifactId>accumulo-project</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <artifactId>accumulo-compaction-coordinator</artifactId>
+  <name>Apache Accumulo Compaction Coordinator</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-server-base</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
new file mode 100644
index 0000000..73670c0
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.compaction.thrift.Status;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.thrift.CompactionStats;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompactionCoordinator extends AbstractServer implements 
+  org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener {
+  
+  private static class QueueAndPriority implements Comparable<QueueAndPriority> {
+    
+    private static WeakHashMap<Pair<String,Long>, QueueAndPriority> CACHE = new WeakHashMap<>();
+    
+    public static QueueAndPriority get(String queue, Long priority) {
+      return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
+    }
+    
+    private final String queue;
+    private final Long priority;
+    
+    private QueueAndPriority(String queue, Long priority) {
+      super();
+      this.queue = queue;
+      this.priority = priority;
+    }
+
+    public String getQueue() {
+      return queue;
+    }
+
+    public Long getPriority() {
+      return priority;
+    }
+    
+
+    @Override
+    public int hashCode() {
+      return queue.hashCode() + priority.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append("queue: ").append(queue);
+      buf.append(", priority: ").append(priority);
+      return buf.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (null == obj)
+        return false;
+      if (obj == this) 
+        return true;
+      if (!(obj instanceof QueueAndPriority)) {
+        return false;
+      } else {
+        QueueAndPriority other = (QueueAndPriority) obj;
+        return this.queue.equals(other.queue) && this.priority.equals(other.priority); 
+      }
+    }
+
+    @Override
+    public int compareTo(QueueAndPriority other) {
+      int result = this.queue.compareTo(other.queue);
+      if (result == 0) {
+        // reversing order such that if other priority is lower, then this has a higher priority
+        return Long.compare(other.priority, this.priority);
+      } else {
+        return result;
+      }
+    }
+    
+  }
+
+  private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
+
+    @Override
+    public void lostLock(LockLossReason reason) {
+      Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+    }
+
+    @Override
+    public void unableToMonitorLockNode(final Exception e) {
+      // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
+      Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
+
+    }
+
+    @Override
+    public synchronized void acquiredLock() {
+      // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+    }
+
+    @Override
+    public synchronized void failedToAcquireLock(Exception e) {
+      // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+    }
+
+  }
+
+  /**
+   * Utility for returning the address in the form host:port
+   *
+   * @return host and port for Compactor client connections
+   */
+  private static String getHostPortString(HostAndPort address) {
+    if (address == null) {
+      return null;
+    }
+    return address.getHost() + ":" + address.getPort();
+  }
+  
+  private static class CompactionUpdate {
+    private final Long timestamp;
+    private final String message;
+    private final CompactionState state;
+    public CompactionUpdate(Long timestamp, String message, CompactionState state) {
+      super();
+      this.timestamp = timestamp;
+      this.message = message;
+      this.state = state;
+    }
+    public Long getTimestamp() {
+      return timestamp;
+    }
+    public String getMessage() {
+      return message;
+    }
+    public CompactionState getState() {
+      return state;
+    }
+  }
+  
+  private static class RunningCompaction {
+    private final CompactionJob job;
+    private final String compactor;
+    private final TServerInstance tserver;
+    private Map<Long, CompactionUpdate> updates = new TreeMap<>();
+    private CompactionStats stats = null;
+    public RunningCompaction(CompactionJob job, String compactor, TServerInstance tserver) {
+      super();
+      this.job = job;
+      this.compactor = compactor;
+      this.tserver = tserver;
+    }
+    public Map<Long,CompactionUpdate> getUpdates() {
+      return updates;
+    }
+    public void addUpdate(Long timestamp, String message, CompactionState state) {
+      this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
+    }
+    public void setUpdates(Map<Long,CompactionUpdate> updates) {
+      this.updates = updates;
+    }
+    public CompactionStats getStats() {
+      return stats;
+    }
+    public void setStats(CompactionStats stats) {
+      this.stats = stats;
+    }
+    public CompactionJob getJob() {
+      return job;
+    }
+    public String getCompactor() {
+      return compactor;
+    }
+    public TServerInstance getTserver() {
+      return tserver;
+    }
+  }
+  
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
+  private static final long TIME_BETWEEN_CHECKS = 5000;
+  
+  /* Map of external queue name -> priority -> tservers */
+  private static final Map<String, TreeMap<Long, LinkedHashSet<TServerInstance>>> QUEUES = new HashMap<>();
+  /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */
+  private static final Map<TServerInstance, HashSet<QueueAndPriority>> INDEX = new HashMap<>();
+  /* Map of compactionId to RunningCompactions */
+  private static final Map<Long, RunningCompaction> RUNNING = new ConcurrentHashMap<>();
+  
+  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+  private final AccumuloConfiguration aconf;
+  
+  private ZooLock coordinatorLock;
+  private LiveTServerSet tserverSet;
+  
+  protected CompactionCoordinator(ServerOpts opts, String[] args) {
+    super("compaction-coordinator", opts, args);
+    ServerContext context = super.getContext();
+    context.setupCrypto();
+
+    aconf = getConfiguration();
+    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
+        () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_CHECKS,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Version " + Constants.VERSION);
+    LOG.info("Instance " + context.getInstanceID());
+  }
+  
+  /**
+   * Set up nodes and locks in ZooKeeper for this CompactionCoordinator
+   *
+   * @param clientAddress
+   *          address of this Compactor
+   * @return true if lock was acquired, else false
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private boolean getCoordinatorLock(HostAndPort clientAddress)
+      throws KeeperException, InterruptedException {
+    LOG.info("trying to get coordinator lock");
+
+    final String coordinatorClientAddress = getHostPortString(clientAddress);
+    final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
+    final UUID zooLockUUID = UUID.randomUUID();
+    
+    CoordinatorLockWatcher managerLockWatcher = new CoordinatorLockWatcher();
+    coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID);
+    return coordinatorLock.tryLock(managerLockWatcher, coordinatorClientAddress.getBytes());
+  }
+
+  /**
+   * Start this CompactionCoordinator thrift service to handle incoming client requests
+   *
+   * @return address of this CompactionCoordinator client service
+   * @throws UnknownHostException
+   */
+  private ServerAddress startCompactorClientService() throws UnknownHostException {
+    CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
+    final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<CompactionCoordinator> processor;
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      CompactionCoordinator tcredProxy =
+          TCredentialsUpdatingWrapper.service(rpcProxy, CompactionCoordinator.class, getConfiguration());
+      processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(tcredProxy);
+    } else {
+      processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(rpcProxy);
+    }
+    Property maxMessageSizeProperty = (aconf.get(Property.COORDINATOR_MAX_MESSAGE_SIZE) != null
+        ? Property.COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(),
+        Property.COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.COORDINATOR_PORTSEARCH, Property.COORDINATOR_MINTHREADS,
+        Property.COORDINATOR_MINTHREADS_TIMEOUT, Property.COORDINATOR_THREADCHECK,
+        maxMessageSizeProperty);
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+  
+  @Override
+  public void run() {
+
+    ServerAddress coordinatorAddress = null;
+    try {
+      coordinatorAddress = startCompactorClientService();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the coordinator service", e1);
+    }
+    final HostAndPort clientAddress = coordinatorAddress.address;
+
+    try {
+      if (!getCoordinatorLock(clientAddress)) {
+        throw new RuntimeException("Unable to get Coordinator lock.");
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException("Exception getting Coordinator lock", e);
+    }
+    
+    tserverSet = new LiveTServerSet(getContext(), this);
+    
+    // TODO: On initial startup contact all running tservers to get information about the compactions
+    // that are current running in external queues to populate the RUNNING map. This is to handle
+    // the case where the coordinator dies or is restarted at runtime
+    
+    tserverSet.startListeningForTabletServerChanges();
+
+    while (true) {
+      tserverSet.getCurrentServers().forEach(tsi -> {
+        try {
+          synchronized(QUEUES) {
+            TabletClientService.Client client = getTabletServerConnection(tsi);
+            List<CompactionQueueSummary> summaries = client.getCompactionQueueInfo();
+            summaries.forEach(summary -> {
+              QueueAndPriority qp = QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority());
+              QUEUES.putIfAbsent(qp.getQueue(), new TreeMap<>()).putIfAbsent(qp.getPriority(), new LinkedHashSet<>()).add(tsi);
+              INDEX.putIfAbsent(tsi, new HashSet<>()).add(qp);
+            });
+          }
+        } catch (TException e) {
+          LOG.warn("Error getting compaction summaries from tablet server: {}", tsi.getHostAndPort(), e);
+        }
+      });
+      UtilWaitThread.sleep(60000);
+    }
+    
+  }
+  
+
+  /**
+   * Callback for the LiveTServerSet object to update us current set of tablet servers, including 
+   * ones that were deleted and added
+   * 
+   * @param current current set of live tservers
+   * @param deleted set of tservers that were removed from current since last update
+   * @param added set of tservers that were added to current since last update
+   */
+  @Override
+  public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+    
+    // run() will iterate over the current and added tservers and add them to the internal
+    // data structures. For tservers that are deleted, we need to remove them from the
+    // internal data structures
+    synchronized (QUEUES) {
+      deleted.forEach(tsi -> {
+        INDEX.get(tsi).forEach(qp -> {
+          TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue());
+          if (null != m) {
+            LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
+            if (null != tservers) {
+              tservers.remove(tsi);
+            }
+          }
+        });
+        INDEX.remove(tsi);
+      });
+    }
+  }
+
+  /**
+   * Return the next compaction job for the queue to a Compactor
+   * 
+   * @param queueName queue
+   * @param compactor compactor address
+   * @return compaction job
+   */
+  @Override
+  public CompactionJob getCompactionJob(String queueName, String compactor) throws TException {
+    String queue = queueName.intern();
+    TServerInstance tserver = null;
+    Long priority = null;
+    synchronized(QUEUES) {
+      TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(queueName.intern());
+      if (null != m) {
+        while (tserver == null) {
+          // Get the first TServerInstance from the highest priority queue
+          Entry<Long, LinkedHashSet<TServerInstance>> entry = m.firstEntry();
+          priority = entry.getKey();
+          LinkedHashSet<TServerInstance> tservers = entry.getValue();
+          if (null == tservers || m.isEmpty()) {
+            // Clean up the map entry when no tservers for this queue and priority
+            m.remove(entry.getKey(), entry.getValue());
+            continue;
+          } else {
+            tserver = tservers.iterator().next();
+            // Remove the tserver from the list, we are going to run a compaction on this server
+            tservers.remove(tserver);
+            if (tservers.size() == 0) {
+              // Clean up the map entry when no tservers remaining for this queue and priority
+              m.remove(entry.getKey(), entry.getValue());
+            }
+            HashSet<QueueAndPriority> qp = INDEX.get(tserver);
+            qp.remove(QueueAndPriority.get(queue, priority));
+            if (qp.size() == 0) {
+              // Remove the tserver from the index
+              INDEX.remove(tserver);
+            }
+            break;
+          }
+        }
+      } else {
+        return (CompactionJob) null; //TODO: or should we thrown an error for no tserver for this queue?
+      }
+    }
+
+    try {
+      TabletClientService.Client client = getTabletServerConnection(tserver);
+      CompactionJob job = client.reserveCompactionJob(queue, priority, compactor);
+      RUNNING.put(job.getCompactionId(), new RunningCompaction(job, compactor, tserver));
+      return job;
+    } catch (TException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+  
+  private TabletClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException {
+    TServerConnection connection = tserverSet.getConnection(tserver);
+    TTransport transport = ThriftTransportPool.getInstance().getTransport(connection.getAddress(), 0, getContext());
+    return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
+  }
+  
+
+  @Override
+  public void cancelCompaction(TKeyExtent extent, String queueName, long priority)
+      throws TException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public List<Status> getCompactionStatus(TKeyExtent extent, String queueName, long priority)
+      throws TException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Compactor calls compactionCompleted passing in the CompactionStats
+   * 
+   * @param job compaction job
+   * @param stats compaction stats
+   */
+  @Override
+  public void compactionCompleted(CompactionJob job, CompactionStats stats) throws TException {
+    RunningCompaction rc = RUNNING.get(job.getCompactionId());
+    if (null != rc) {
+      rc.setStats(stats);
+    }
+    // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify
+    // the tserver that the compaction has finished and instead let the tserver that is hosting the
+    // tablet poll for state updates. That way if the tablet is re-hosted, the tserver can check as
+    // part of the tablet loading process. This would also enable us to remove the running compaction
+    // from RUNNING when the tserver makes the call and gets the stats.
+    TabletClientService.Client client = getTabletServerConnection(rc.getTserver());
+    client.compactionJobFinished(rc.getJob());
+  }
+
+  /**
+   * Compactor calls to update the status of the assigned compaction
+   * 
+   * @param job compaction job
+   * @param state compaction state
+   * @param message informational message
+   * @param timestamp timestamp of the message
+   */
+  @Override
+  public void updateCompactionStatus(CompactionJob job, CompactionState state,
+      String message, long timestamp) throws TException {
+    RunningCompaction rc = RUNNING.get(job.getCompactionId());
+    if (null != rc) {
+      rc.addUpdate(timestamp, message, state);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) {
+      compactor.runServer();
+    }
+  }
+
+}