You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:12 UTC

[22/51] [partial] aurora git commit: Move packages from com.twitter.common to org.apache.aurora.common

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
new file mode 100644
index 0000000..93f6610
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+
+/**
+ * A host set that can be monitored for changes.
+ *
+ * @param <T> The type that is used to identify members of the host set.
+ */
+public interface DynamicHostSet<T> {
+
+  /**
+   * Registers a monitor to receive change notices for this server set as long as this jvm process
+   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+   * The monitor will be notified if the membership set or parameters of existing members have
+   * changed.
+   *
+   * @param monitor the server set monitor to call back when the host set changes
+   * @throws MonitorException if there is a problem monitoring the host set
+   * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)}
+   */
+  @Deprecated
+  public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+  /**
+   * Registers a monitor to receive change notices for this server set as long as this jvm process
+   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+   * The monitor will be notified if the membership set or parameters of existing members have
+   * changed.
+   *
+   * @param monitor the server set monitor to call back when the host set changes
+   * @return A command which, when executed, will stop monitoring the host set.
+   * @throws MonitorException if there is a problem monitoring the host set
+   */
+  public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+  /**
+   * An interface to an object that is interested in receiving notification whenever the host set
+   * changes.
+   */
+  public static interface HostChangeMonitor<T> {
+
+    /**
+     * Called when either the available set of services changes (when a service dies or a new
+     * instance comes on-line) or when an existing service advertises a status or health change.
+     *
+     * @param hostSet the current set of available ServiceInstances
+     */
+    void onChange(ImmutableSet<T> hostSet);
+  }
+
+  public static class MonitorException extends Exception {
+    public MonitorException(String msg) {
+      super(msg);
+    }
+
+    public MonitorException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
new file mode 100644
index 0000000..4f75893
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+
+/**
+ * Utility methods for dealing with dynamic sets of hosts.
+ */
+public final class DynamicHostSetUtil {
+
+  /**
+   * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of
+   * the underlying actual endpoints.
+   *
+   * @param hostSet The hostSet to snapshot.
+   * @throws DynamicHostSet.MonitorException if there was a problem obtaining the snapshot.
+   */
+  public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws DynamicHostSet.MonitorException {
+    final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder();
+    Command unwatch = hostSet.watch(new DynamicHostSet.HostChangeMonitor<T>() {
+      @Override public void onChange(ImmutableSet<T> hostSet) {
+        snapshot.addAll(hostSet);
+      }
+    });
+    unwatch.execute();
+    return snapshot.build();
+  }
+
+  private DynamicHostSetUtil() {
+    // utility
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
new file mode 100644
index 0000000..2fd6046
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ServerSet;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a
+ * {@link ServerSet}.
+ *
+ * @param <H> The type that contains metadata information about hosts, such as liveness and address.
+ * @param <T> The raw connection type that is being pooled.
+ * @param <E> The type that identifies the endpoint of the pool, such as an address.
+ * @author John Sirois
+ */
+public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> {
+
+  private final MetaPool<T, E> pool;
+
+  /**
+   * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools
+   * for the given {@code serverSet}.
+   *
+   * @param hostSet the dynamic set of available servers to pool connections for
+   * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint
+   * @param loadBalancer Load balancer to manage request flow.
+   * @param onBackendsChosen A callback to notify of chosen backends.
+   * @param restoreInterval the interval after connection errors start occurring for a target to
+   *     begin checking to see if it has come back to a healthy state
+   * @param endpointExtractor Function that transforms a service instance into an endpoint instance.
+   * @param livenessChecker Filter that will determine whether a host indicates itself as available.
+   * @throws DynamicHostSet.MonitorException if there is a problem monitoring the host set
+   */
+  public DynamicPool(DynamicHostSet<H> hostSet,
+      Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory,
+      LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen,
+      Amount<Long, Time> restoreInterval,
+      Function<H, E> endpointExtractor,
+      Predicate<H> livenessChecker)
+      throws DynamicHostSet.MonitorException {
+    Preconditions.checkNotNull(hostSet);
+    Preconditions.checkNotNull(endpointPoolFactory);
+
+    pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval);
+
+    // TODO(John Sirois): consider an explicit start/stop
+    hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor,
+        livenessChecker) {
+      @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+          Map<E, ObjectPool<Connection<T, E>>> livePools) {
+        poolRebuilt(deadPools, livePools);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+      Map<E, ObjectPool<Connection<T, E>>> livePools) {
+
+    pool.setBackends(livePools);
+
+    for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) {
+      deadTargetPool.close();
+    }
+  }
+
+  @Override
+  public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+    return pool.get();
+  }
+
+  @Override
+  public Connection<T, E> get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+    return pool.get(timeout);
+  }
+
+  @Override
+  public void release(Connection<T, E> connection) {
+    pool.release(connection);
+  }
+
+  @Override
+  public void remove(Connection<T, E> connection) {
+    pool.remove(connection);
+  }
+
+  @Override
+  public void close() {
+    pool.close();
+  }
+
+  private abstract class PoolMonitor<H, S extends Connection<?, ?>>
+      implements DynamicHostSet.HostChangeMonitor<H> {
+
+    private final Function<E, ObjectPool<S>> endpointPoolFactory;
+    private final Function<H, E> endpointExtractor;
+    private final Predicate<H> livenessTest;
+
+    public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory,
+        Function<H, E> endpointExtractor,
+        Predicate<H> livenessTest) {
+      this.endpointPoolFactory = endpointPoolFactory;
+      this.endpointExtractor = endpointExtractor;
+      this.livenessTest = livenessTest;
+    }
+
+    private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap();
+
+    @Override
+    public synchronized void onChange(ImmutableSet<H> serverSet) {
+      // TODO(John Sirois): change onChange to pass the delta data since its already computed by
+      // ServerSet
+
+      Map<E, H> newEndpoints =
+          Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor);
+
+      Set<E> deadEndpoints = ImmutableSet.copyOf(
+          Sets.difference(endpointPools.keySet(), newEndpoints.keySet()));
+      Set<ObjectPool<S>> deadPools = Sets.newHashSet();
+      for (E endpoint : deadEndpoints) {
+        ObjectPool<S> deadPool = endpointPools.remove(endpoint);
+        deadPools.add(deadPool);
+      }
+
+      Set<E> addedEndpoints = ImmutableSet.copyOf(
+          Sets.difference(newEndpoints.keySet(), endpointPools.keySet()));
+      for (E endpoint : addedEndpoints) {
+        ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint);
+        endpointPools.put(endpoint, endpointPool);
+      }
+
+      onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools));
+    }
+
+    protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools,
+        Map<E, ObjectPool<S>> livePools);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
new file mode 100644
index 0000000..df1bd96
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A connection pool that picks connections from a set of backend pools.  Backend pools are selected
+ * from randomly initially but then as they are used they are ranked according to how many
+ * connections they have available and whether or not the last used connection had an error or not.
+ * In this way, backends that are responsive should get selected in preference to those that are
+ * not.
+ *
+ * <p>Non-responsive backends are monitored after a configurable period in a background thread and
+ * if a connection can be obtained they start to float back up in the rankings.  In this way,
+ * backends that are initially non-responsive but later become responsive should end up getting
+ * selected.
+ *
+ * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
+ *
+ * @author John Sirois
+ */
+public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> {
+
+  private final Command stopBackendRestorer;
+
+  private Map<E, ObjectPool<Connection<T, E>>> backends = null;
+
+  // Locks to guard mutation of the backends set.
+  private final Lock backendsReadLock;
+  private final Lock backendsWriteLock;
+
+  private final Closure<Collection<E>> onBackendsChosen;
+
+  private final LoadBalancer<E> loadBalancer;
+
+  /**
+   * Creates a connection pool with no backends.  Backends may be added post-creation by calling
+   * {@link #setBackends(java.util.Map)}
+   *
+   * @param loadBalancer the load balancer to distribute requests among backends.
+   * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+   *     set of backends to restrict its call distribution to
+   * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+   *     see if it has come back to a healthy state
+   */
+  public MetaPool(LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+    this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer,
+        onBackendsChosen, restoreInterval);
+  }
+
+  /**
+   * Creates a connection pool that balances connections across multiple backend pools.
+   *
+   * @param backends the connection pools for the backends
+   * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+   *     set of backends to restrict its call distribution to
+   * @param loadBalancer the load balancer to distribute requests among backends.
+   * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+   *     see if it has come back to a healthy state
+   */
+  public MetaPool(
+      ImmutableMap<E, ObjectPool<Connection<T, E>>> backends,
+      LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+
+    this.loadBalancer = Preconditions.checkNotNull(loadBalancer);
+    this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen);
+
+    ReadWriteLock backendsLock = new ReentrantReadWriteLock(true);
+    backendsReadLock = backendsLock.readLock();
+    backendsWriteLock = backendsLock.writeLock();
+
+    setBackends(backends);
+
+    Preconditions.checkNotNull(restoreInterval);
+    Preconditions.checkArgument(restoreInterval.getValue() > 0);
+    stopBackendRestorer = startDeadBackendRestorer(restoreInterval);
+  }
+
+  /**
+   * Assigns the backend pools that this pool should draw from.
+   *
+   * @param pools New pools to use.
+   */
+  public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) {
+    backendsWriteLock.lock();
+    try {
+      backends = Preconditions.checkNotNull(pools);
+      loadBalancer.offerBackends(pools.keySet(), onBackendsChosen);
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+
+  private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) {
+
+    final AtomicBoolean shouldRestore = new AtomicBoolean(true);
+    Runnable restoreDeadBackends = new Runnable() {
+      @Override public void run() {
+        if (shouldRestore.get()) {
+          restoreDeadBackends(restoreInterval);
+        }
+      }
+    };
+    final ScheduledExecutorService scheduledExecutorService =
+        Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("MTCP-DeadBackendRestorer[%s]")
+                .build());
+    long restoreDelay = restoreInterval.getValue();
+    scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay,
+        restoreDelay, restoreInterval.getUnit().getTimeUnit());
+
+    return new Command() {
+      @Override public void execute() {
+        shouldRestore.set(false);
+        scheduledExecutorService.shutdownNow();
+        LOG.info("Backend restorer shut down");
+      }
+    };
+  }
+
+  private static final Logger LOG = Logger.getLogger(MetaPool.class.getName());
+
+  private void restoreDeadBackends(Amount<Long, Time> restoreInterval) {
+    for (E backend : snapshotBackends()) {
+      ObjectPool<Connection<T, E>> pool;
+      backendsReadLock.lock();
+      try {
+        pool = backends.get(backend);
+      } finally {
+        backendsReadLock.unlock();
+      }
+
+      // We can lose a race if the backends change - and that's fine, we'll restore the new set of
+      // backends in the next scheduled restoration run.
+      if (pool != null) {
+        try {
+          release(get(backend, pool, restoreInterval));
+        } catch (TimeoutException e) {
+          LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+        } catch (ResourceExhaustedException e) {
+          LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+        }
+      }
+    }
+  }
+
+  private Iterable<E> snapshotBackends() {
+    backendsReadLock.lock();
+    try {
+      return ImmutableList.copyOf(backends.keySet());
+    } finally {
+      backendsReadLock.unlock();
+    }
+  }
+
+  @Override
+  public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+    return get(ObjectPool.NO_TIMEOUT);
+  }
+
+  @Override
+  public Connection<T, E> get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+
+    E backend;
+    ObjectPool<Connection<T, E>> pool;
+
+    backendsReadLock.lock();
+    try {
+      backend = loadBalancer.nextBackend();
+      Preconditions.checkNotNull(backend, "Load balancer gave a null backend.");
+
+      pool = backends.get(backend);
+      Preconditions.checkNotNull(backend,
+          "Given backend %s not found in tracked backends: %s", backend, backends);
+    } finally {
+      backendsReadLock.unlock();
+    }
+
+    return get(backend, pool, timeout);
+  }
+
+  private static class ManagedConnection<T, E> implements Connection<T, E> {
+    private final Connection<T, E> connection;
+    private final ObjectPool<Connection<T, E>> pool;
+
+    private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) {
+      this.connection = connection;
+      this.pool = pool;
+    }
+
+    @Override
+    public void close() {
+      connection.close();
+    }
+
+    @Override
+    public T get() {
+      return connection.get();
+    }
+
+    @Override
+    public boolean isValid() {
+      return connection.isValid();
+    }
+
+    @Override
+    public E getEndpoint() {
+      return connection.getEndpoint();
+    }
+
+    @Override public String toString() {
+      return "ManagedConnection[" + connection.toString() + "]";
+    }
+
+    void release(boolean remove) {
+      if (remove) {
+        pool.remove(connection);
+      } else {
+        pool.release(connection);
+      }
+    }
+  }
+
+  private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool,
+      Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
+
+    long startNanos = System.nanoTime();
+    try {
+      Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout);
+
+      // BEWARE: We have leased a connection from the underlying pool here and must return it to the
+      // caller so they can later release it.  If we fail to do so, the connection will leak.
+      // Catching intermediate exceptions ourselves and pro-actively returning the connection to the
+      // pool before re-throwing is not a viable option since the return would have to succeed,
+      // forcing us to ignore the timeout passed in.
+
+      // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire
+      // a (backend, pool) pair atomically that has since been removed, we can safely let the lb
+      // know about backend events and it will just ignore us.
+
+      try {
+        loadBalancer.connected(backend, System.nanoTime() - startNanos);
+      } catch (RuntimeException e) {
+        LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after "
+                               + "leasing a connection - continuing", e);
+      }
+      return new ManagedConnection<T, E>(connection, pool);
+    } catch (TimeoutException e) {
+      loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT);
+      throw e;
+    } catch (ResourceExhaustedException e) {
+      loadBalancer.connectFailed(backend, ConnectionResult.FAILED);
+      throw e;
+    }
+  }
+
+  @Override
+  public void release(Connection<T, E> connection) {
+    release(connection, false);
+  }
+
+  /**
+   * Equivalent to releasing a Connection with isValid() == false.
+   * @see ObjectPool#remove(Object)
+   */
+  @Override
+  public void remove(Connection<T, E> connection) {
+    release(connection, true);
+  }
+
+  private void release(Connection<T, E> connection, boolean remove) {
+    backendsWriteLock.lock();
+    try {
+
+      if (!(connection instanceof ManagedConnection)) {
+        throw new IllegalArgumentException("Connection not controlled by this connection pool: "
+                                           + connection);
+      }
+      ((ManagedConnection) connection).release(remove);
+
+      loadBalancer.released(connection.getEndpoint());
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    stopBackendRestorer.execute();
+
+    backendsWriteLock.lock();
+    try {
+      for (ObjectPool<Connection<T, E>> backend : backends.values()) {
+        backend.close();
+      }
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
new file mode 100644
index 0000000..a665903
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A generic object pool that provides object of a given type for exclusive use by the caller.
+ * Object pools generally pool expensive resources and so offer a {@link #close} method that should
+ * be used to free these resources when the pool is no longer needed.
+ *
+ * @author John Sirois
+ */
+public interface ObjectPool<T> {
+
+  /**
+   * Gets a resource potentially blocking for as long as it takes to either create a new one or wait
+   * for one to be {@link #release(Object) released}.  Callers must {@link #release(Object) release}
+   * the connection when they are done with it.
+   *
+   * @return a resource for exclusive use by the caller
+   * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+   *     exhausted
+   * @throws TimeoutException if we timed out while trying to fetch a resource
+   */
+  T get() throws ResourceExhaustedException, TimeoutException;
+
+  /**
+   * A convenience constant representing a no timeout.
+   */
+  Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS);
+
+  /**
+   * Gets a resource; timing out if there are none available and it takes longer than specified to
+   * create a new one or wait for one to be {@link #release(Object) released}.  Callers must
+   * {@link #release (Object) release} the connection when they are done with it.
+   *
+   * @param timeout the maximum amount of time to wait
+   * @return a resource for exclusive use by the caller
+   * @throws TimeoutException if the specified timeout was reached before a resource became
+   *     available
+   * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+   *     exhausted
+   */
+  T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException;
+
+  /**
+   * Releases a resource obtained from this pool back into the pool of available resources. It is an
+   * error to release a resource not obtained from this pool.
+   *
+   * @param resource Resource to release.
+   */
+  void release(T resource);
+
+  /**
+   * Removes a resource obtained from this pool from its available resources.  It is an error to
+   * remove a resource not obtained from this pool.
+   *
+   * @param resource Resource to remove.
+   */
+  void remove(T resource);
+
+  /**
+   * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects
+   * when they are released.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
new file mode 100644
index 0000000..fd48ddb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+/**
+ * @author John Sirois
+ */
+public class ResourceExhaustedException extends Exception {
+  public ResourceExhaustedException(String message) {
+    super(message);
+  }
+
+  public ResourceExhaustedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
new file mode 100644
index 0000000..95c8868
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.objectsize;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
+
+/**
+ * Contains utility methods for calculating the memory usage of objects. It
+ * only works on the HotSpot JVM, and infers the actual memory layout (32 bit
+ * vs. 64 bit word size, compressed object pointers vs. uncompressed) from
+ * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM.
+ * It can only make an educated guess at whether compressed OOPs are used,
+ * though; specifically, it knows what the JVM's default choice of OOP
+ * compression would be based on HotSpot version and maximum heap sizes, but if
+ * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line
+ * switch, it can not detect
+ * this fact and will report incorrect sizes, as it will presume the default JVM
+ * behavior.
+ *
+ * @author Attila Szegedi
+ */
+public class ObjectSizeCalculator {
+
+  /**
+   * Describes constant memory overheads for various constructs in a JVM implementation.
+   */
+  public interface MemoryLayoutSpecification {
+
+    /**
+     * Returns the fixed overhead of an array of any type or length in this JVM.
+     *
+     * @return the fixed overhead of an array.
+     */
+    int getArrayHeaderSize();
+
+    /**
+     * Returns the fixed overhead of for any {@link Object} subclass in this JVM.
+     *
+     * @return the fixed overhead of any object.
+     */
+    int getObjectHeaderSize();
+
+    /**
+     * Returns the quantum field size for a field owned by an object in this JVM.
+     *
+     * @return the quantum field size for an object.
+     */
+    int getObjectPadding();
+
+    /**
+     * Returns the fixed size of an object reference in this JVM.
+     *
+     * @return the size of all object references.
+     */
+    int getReferenceSize();
+
+    /**
+     * Returns the quantum field size for a field owned by one of an object's ancestor superclasses
+     * in this JVM.
+     *
+     * @return the quantum field size for a superclass field.
+     */
+    int getSuperclassFieldPadding();
+  }
+
+  private static class CurrentLayout {
+    private static final MemoryLayoutSpecification SPEC =
+        getEffectiveMemoryLayoutSpecification();
+  }
+
+  /**
+   * Given an object, returns the total allocated size, in bytes, of the object
+   * and all other objects reachable from it.  Attempts to to detect the current JVM memory layout,
+   * but may fail with {@link UnsupportedOperationException};
+   *
+   * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+   *          anything special, it measures the size of all objects
+   *          reachable through it (which will include its class loader, and by
+   *          extension, all other Class objects loaded by
+   *          the same loader, and all the parent class loaders). It doesn't provide the
+   *          size of the static fields in the JVM class that the Class object
+   *          represents.
+   * @return the total allocated size of the object and all other objects it
+   *         retains.
+   * @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
+   */
+  public static long getObjectSize(Object obj) throws UnsupportedOperationException {
+    return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
+  }
+
+  // Fixed object header size for arrays.
+  private final int arrayHeaderSize;
+  // Fixed object header size for non-array objects.
+  private final int objectHeaderSize;
+  // Padding for the object size - if the object size is not an exact multiple
+  // of this, it is padded to the next multiple.
+  private final int objectPadding;
+  // Size of reference (pointer) fields.
+  private final int referenceSize;
+  // Padding for the fields of superclass before fields of subclasses are
+  // added.
+  private final int superclassFieldPadding;
+
+  private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
+      CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
+        public ClassSizeInfo load(Class<?> clazz) {
+          return new ClassSizeInfo(clazz);
+        }
+      });
+
+
+  private final Set<Object> alreadyVisited = Sets.newIdentityHashSet();
+  private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024);
+  private long size;
+
+  /**
+   * Creates an object size calculator that can calculate object sizes for a given
+   * {@code memoryLayoutSpecification}.
+   *
+   * @param memoryLayoutSpecification a description of the JVM memory layout.
+   */
+  public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
+    Preconditions.checkNotNull(memoryLayoutSpecification);
+    arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
+    objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
+    objectPadding = memoryLayoutSpecification.getObjectPadding();
+    referenceSize = memoryLayoutSpecification.getReferenceSize();
+    superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
+  }
+
+  /**
+   * Given an object, returns the total allocated size, in bytes, of the object
+   * and all other objects reachable from it.
+   *
+   * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+   *          anything special, it measures the size of all objects
+   *          reachable through it (which will include its class loader, and by
+   *          extension, all other Class objects loaded by
+   *          the same loader, and all the parent class loaders). It doesn't provide the
+   *          size of the static fields in the JVM class that the Class object
+   *          represents.
+   * @return the total allocated size of the object and all other objects it
+   *         retains.
+   */
+  public synchronized long calculateObjectSize(Object obj) {
+    // Breadth-first traversal instead of naive depth-first with recursive
+    // implementation, so we don't blow the stack traversing long linked lists.
+    try {
+      for (;;) {
+        visit(obj);
+        if (pending.isEmpty()) {
+          return size;
+        }
+        obj = pending.removeFirst();
+      }
+    } finally {
+      alreadyVisited.clear();
+      pending.clear();
+      size = 0;
+    }
+  }
+
+  private void visit(Object obj) {
+    if (alreadyVisited.contains(obj)) {
+      return;
+    }
+    final Class<?> clazz = obj.getClass();
+    if (clazz == ArrayElementsVisitor.class) {
+      ((ArrayElementsVisitor) obj).visit(this);
+    } else {
+      alreadyVisited.add(obj);
+      if (clazz.isArray()) {
+        visitArray(obj);
+      } else {
+        classSizeInfos.getUnchecked(clazz).visit(obj, this);
+      }
+    }
+  }
+
+  private void visitArray(Object array) {
+    final Class<?> componentType = array.getClass().getComponentType();
+    final int length = Array.getLength(array);
+    if (componentType.isPrimitive()) {
+      increaseByArraySize(length, getPrimitiveFieldSize(componentType));
+    } else {
+      increaseByArraySize(length, referenceSize);
+      // If we didn't use an ArrayElementsVisitor, we would be enqueueing every
+      // element of the array here instead. For large arrays, it would
+      // tremendously enlarge the queue. In essence, we're compressing it into
+      // a small command object instead. This is different than immediately
+      // visiting the elements, as their visiting is scheduled for the end of
+      // the current queue.
+      switch (length) {
+        case 0: {
+          break;
+        }
+        case 1: {
+          enqueue(Array.get(array, 0));
+          break;
+        }
+        default: {
+          enqueue(new ArrayElementsVisitor((Object[]) array));
+        }
+      }
+    }
+  }
+
+  private void increaseByArraySize(int length, long elementSize) {
+    increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
+  }
+
+  private static class ArrayElementsVisitor {
+    private final Object[] array;
+
+    ArrayElementsVisitor(Object[] array) {
+      this.array = array;
+    }
+
+    public void visit(ObjectSizeCalculator calc) {
+      for (Object elem : array) {
+        if (elem != null) {
+          calc.visit(elem);
+        }
+      }
+    }
+  }
+
+  void enqueue(Object obj) {
+    if (obj != null) {
+      pending.addLast(obj);
+    }
+  }
+
+  void increaseSize(long objectSize) {
+    size += objectSize;
+  }
+
+  @VisibleForTesting
+  static long roundTo(long x, int multiple) {
+    return ((x + multiple - 1) / multiple) * multiple;
+  }
+
+  private class ClassSizeInfo {
+    // Padded fields + header size
+    private final long objectSize;
+    // Only the fields size - used to calculate the subclasses' memory
+    // footprint.
+    private final long fieldsSize;
+    private final Field[] referenceFields;
+
+    public ClassSizeInfo(Class<?> clazz) {
+      long fieldsSize = 0;
+      final List<Field> referenceFields = new LinkedList<Field>();
+      for (Field f : clazz.getDeclaredFields()) {
+        if (Modifier.isStatic(f.getModifiers())) {
+          continue;
+        }
+        final Class<?> type = f.getType();
+        if (type.isPrimitive()) {
+          fieldsSize += getPrimitiveFieldSize(type);
+        } else {
+          f.setAccessible(true);
+          referenceFields.add(f);
+          fieldsSize += referenceSize;
+        }
+      }
+      final Class<?> superClass = clazz.getSuperclass();
+      if (superClass != null) {
+        final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass);
+        fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
+        referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
+      }
+      this.fieldsSize = fieldsSize;
+      this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
+      this.referenceFields = referenceFields.toArray(
+          new Field[referenceFields.size()]);
+    }
+
+    void visit(Object obj, ObjectSizeCalculator calc) {
+      calc.increaseSize(objectSize);
+      enqueueReferencedObjects(obj, calc);
+    }
+
+    public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
+      for (Field f : referenceFields) {
+        try {
+          calc.enqueue(f.get(obj));
+        } catch (IllegalAccessException e) {
+          final AssertionError ae = new AssertionError(
+              "Unexpected denial of access to " + f);
+          ae.initCause(e);
+          throw ae;
+        }
+      }
+    }
+  }
+
+  private static long getPrimitiveFieldSize(Class<?> type) {
+    if (type == boolean.class || type == byte.class) {
+      return 1;
+    }
+    if (type == char.class || type == short.class) {
+      return 2;
+    }
+    if (type == int.class || type == float.class) {
+      return 4;
+    }
+    if (type == long.class || type == double.class) {
+      return 8;
+    }
+    throw new AssertionError("Encountered unexpected primitive type " +
+        type.getName());
+  }
+
+  @VisibleForTesting
+  static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
+    final String vmName = System.getProperty("java.vm.name");
+    if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ")
+        || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) {
+      throw new UnsupportedOperationException(
+          "ObjectSizeCalculator only supported on HotSpot VM");
+    }
+
+    final String dataModel = System.getProperty("sun.arch.data.model");
+    if ("32".equals(dataModel)) {
+      // Running with 32-bit data model
+      return new MemoryLayoutSpecification() {
+        @Override public int getArrayHeaderSize() {
+          return 12;
+        }
+        @Override public int getObjectHeaderSize() {
+          return 8;
+        }
+        @Override public int getObjectPadding() {
+          return 8;
+        }
+        @Override public int getReferenceSize() {
+          return 4;
+        }
+        @Override public int getSuperclassFieldPadding() {
+          return 4;
+        }
+      };
+    } else if (!"64".equals(dataModel)) {
+      throw new UnsupportedOperationException("Unrecognized value '" +
+          dataModel + "' of sun.arch.data.model system property");
+    }
+
+    final String strVmVersion = System.getProperty("java.vm.version");
+    final int vmVersion = Integer.parseInt(strVmVersion.substring(0,
+        strVmVersion.indexOf('.')));
+    if (vmVersion >= 17) {
+      long maxMemory = 0;
+      for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
+        maxMemory += mp.getUsage().getMax();
+      }
+      if (maxMemory < 30L * 1024 * 1024 * 1024) {
+        // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
+        // for all memory pools (yes, including code cache).
+        return new MemoryLayoutSpecification() {
+          @Override public int getArrayHeaderSize() {
+            return 16;
+          }
+          @Override public int getObjectHeaderSize() {
+            return 12;
+          }
+          @Override public int getObjectPadding() {
+            return 8;
+          }
+          @Override public int getReferenceSize() {
+            return 4;
+          }
+          @Override public int getSuperclassFieldPadding() {
+            return 4;
+          }
+        };
+      }
+    }
+
+    // In other cases, it's a 64-bit uncompressed OOPs object model
+    return new MemoryLayoutSpecification() {
+      @Override public int getArrayHeaderSize() {
+        return 24;
+      }
+      @Override public int getObjectHeaderSize() {
+        return 16;
+      }
+      @Override public int getObjectPadding() {
+        return 8;
+      }
+      @Override public int getReferenceSize() {
+        return 8;
+      }
+      @Override public int getSuperclassFieldPadding() {
+        return 8;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
new file mode 100644
index 0000000..11be7f5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.collections.Pair;
+
+/**
+ * Represents a value in a unit system and facilitates unambiguous communication of amounts.
+ * Instances are created via static factory {@code of(...)} methods.
+ *
+ * @param <T> the type of number the amount value is expressed in
+ * @param <U> the type of unit that this amount quantifies
+ *
+ * @author John Sirois
+ */
+public abstract class Amount<T extends Number & Comparable<T>, U extends Unit<U>>
+    implements Comparable<Amount<T, U>> {
+
+  /**
+   * Thrown when a checked operation on an amount would overflow.
+   */
+
+  public static class TypeOverflowException extends RuntimeException {
+    public TypeOverflowException() {
+      super();
+    }
+  }
+
+  private final Pair<T, U> amount;
+  private final T maxValue;
+
+  private Amount(T value, U unit, T maxValue) {
+    Preconditions.checkNotNull(value);
+    Preconditions.checkNotNull(unit);
+    this.maxValue = maxValue;
+    this.amount = Pair.of(value, unit);
+  }
+
+  public T getValue() {
+    return amount.getFirst();
+  }
+
+  public U getUnit() {
+    return amount.getSecond();
+  }
+
+  public T as(U unit) {
+    return asUnit(unit);
+  }
+
+  /**
+   * Throws TypeOverflowException if an overflow occurs during scaling.
+   */
+  public T asChecked(U unit) {
+    T retVal = asUnit(unit);
+    if (retVal.equals(maxValue)) {
+      throw new TypeOverflowException();
+    }
+    return retVal;
+  }
+
+  private T asUnit(Unit<?> unit) {
+    return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier());
+  }
+
+  @Override
+  public int hashCode() {
+    return amount.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof Amount)) {
+      return false;
+    }
+
+    Amount<?, ?> other = (Amount<?, ?>) obj;
+    return amount.equals(other.amount) || isSameAmount(other);
+  }
+
+  private boolean isSameAmount(Amount<?, ?> other) {
+    // Equals allows Object - so we have no compile time check that other has the right value type;
+    // ie: make sure they don't have Integer when we have Long.
+    Number value = other.getValue();
+    if (!getValue().getClass().isInstance(value)) {
+      return false;
+    }
+
+    Unit<?> unit = other.getUnit();
+    if (!getUnit().getClass().isInstance(unit)) {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    U otherUnit = (U) other.getUnit();
+    return isSameAmount(other, otherUnit);
+  }
+
+  private boolean isSameAmount(Amount<?, ?> other, U otherUnit) {
+    // Compare in the more precise unit (the one with the lower multiplier).
+    if (otherUnit.multiplier() > getUnit().multiplier()) {
+      return getValue().equals(other.asUnit(getUnit()));
+    } else {
+      return as(otherUnit).equals(other.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return amount.toString();
+  }
+
+  @Override
+  public int compareTo(Amount<T, U> other) {
+    // Compare in the more precise unit (the one with the lower multiplier).
+    if (other.getUnit().multiplier() > getUnit().multiplier()) {
+      return getValue().compareTo(other.as(getUnit()));
+    } else {
+      return as(other.getUnit()).compareTo(other.getValue());
+    }
+  }
+
+  private boolean sameUnits(Unit<? extends Unit<?>> unit) {
+    return getUnit().equals(unit);
+  }
+
+  protected abstract T scale(double multiplier);
+
+  /**
+   * Creates an amount that uses a {@code double} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Double, U> of(double number, U unit) {
+    return new Amount<Double, U>(number, unit, Double.MAX_VALUE) {
+      @Override protected Double scale(double multiplier) {
+        return getValue() * multiplier;
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses a {@code float} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Float, U> of(float number, U unit) {
+    return new Amount<Float, U>(number, unit, Float.MAX_VALUE) {
+      @Override protected Float scale(double multiplier) {
+        return (float) (getValue() * multiplier);
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses a {@code long} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Long, U> of(long number, U unit) {
+    return new Amount<Long, U>(number, unit, Long.MAX_VALUE) {
+      @Override protected Long scale(double multiplier) {
+        return (long) (getValue() * multiplier);
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses an {@code int} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Integer, U> of(int number, U unit) {
+    return new Amount<Integer, U>(number, unit, Integer.MAX_VALUE) {
+      @Override protected Integer scale(double multiplier) {
+        return (int) (getValue() * multiplier);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Data.java b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
new file mode 100644
index 0000000..80d077b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s.
+ * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor
+ * of 1024 instead of 1000 as typical in metric units.  Additionally, units are divided in 2
+ * hierarchies one based on bits and the other on bytes.  Thus {@link #Kb} represents kilobits; so
+ * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits.
+ *
+ * @author John Sirois
+ */
+public enum Data implements Unit<Data> {
+  BITS(1),
+  Kb(1024, BITS),
+  Mb(1024, Kb),
+  Gb(1024, Mb),
+  BYTES(8, BITS),
+  KB(1024, BYTES),
+  MB(1024, KB),
+  GB(1024, MB),
+  TB(1024, GB),
+  PB(1024, TB);
+
+  private final double multiplier;
+
+  private Data(double multiplier) {
+    this.multiplier = multiplier;
+  }
+
+  private Data(double multiplier, Data base) {
+    this(multiplier * base.multiplier);
+  }
+
+  @Override
+  public double multiplier() {
+    return multiplier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Time.java b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
new file mode 100644
index 0000000..ebf77eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s.
+ *
+ * @author John Sirois
+ */
+public enum Time implements Unit<Time> {
+  NANOSECONDS(1, TimeUnit.NANOSECONDS, "ns"),
+  MICROSECONDS(1000, NANOSECONDS, TimeUnit.MICROSECONDS, "us"),
+  MILLISECONDS(1000, MICROSECONDS, TimeUnit.MILLISECONDS, "ms"),
+  SECONDS(1000, MILLISECONDS, TimeUnit.SECONDS, "secs"),
+  MINUTES(60, SECONDS, TimeUnit.MINUTES, "mins"),
+  HOURS(60, MINUTES, TimeUnit.HOURS, "hrs"),
+  DAYS(24, HOURS, TimeUnit.DAYS, "days");
+
+  private final double multiplier;
+  private final TimeUnit timeUnit;
+  private final String display;
+
+  private Time(double multiplier, TimeUnit timeUnit, String display) {
+    this.multiplier = multiplier;
+    this.timeUnit = timeUnit;
+    this.display = display;
+  }
+
+  private Time(double multiplier, Time base, TimeUnit timeUnit, String display) {
+    this(multiplier * base.multiplier, timeUnit, display);
+  }
+
+  @Override
+  public double multiplier() {
+    return multiplier;
+  }
+
+  /**
+   * Returns the equivalent {@code TimeUnit}.
+   */
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  @Override
+  public String toString() {
+    return display;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
new file mode 100644
index 0000000..dd9b9ec
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+/**
+ * Represents a unit hierarchy for a given unit of measure; eg: time.  Instances represent specific
+ * units from the hierarchy; eg: seconds.
+ *
+ * @param <U> the type of the concrete unit implementation
+ *
+ * @author John Sirois
+ */
+public interface Unit<U extends Unit<U>> {
+
+  /**
+   * Returns the weight of this unit relative to other units in the same hierarchy.  Typically the
+   * smallest unit in the hierarchy returns 1, but this need not be the case.  It is only required
+   * that each unit of the hierarchy return a multiplier relative to a common base unit for the
+   * hierarchy.
+   */
+  double multiplier();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
new file mode 100644
index 0000000..cfbf04e
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+
+/**
+ * <p>
+ * Implements Histogram structure for computing approximate quantiles.
+ * The implementation is based on the following paper:
+ *
+ * <pre>
+ * [MP80]  Munro & Paterson, "Selection and Sorting with Limited Storage",
+ *         Theoretical Computer Science, Vol 12, p 315-323, 1980.
+ * </pre>
+ * </p>
+ * <p>
+ * You could read a detailed description of the same algorithm here:
+ *
+ * <pre>
+ * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
+ *         Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM
+ *         SIGMOD, Vol 27, No 2, p 426-435, June 1998.
+ * </pre>
+ * </p>
+ * <p>
+ * There's a good explanation of the algorithm in the Sawzall source code
+ * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc
+ * </p>
+ * Here's a schema of the tree:
+ * <pre>
+ *      [4]     level 3, weight=rootWeight=8
+ *       |
+ *      [3]     level 2, weight=4
+ *       |
+ *      [2]     level 1, weight=2
+ *     /   \
+ *   [0]   [1]  level 0, weight=1
+ * </pre>
+ * <p>
+ * {@code [i]} represents {@code buffer[i]}
+ * The depth of the tree is limited to a maximum value
+ * Every buffer has the same size
+ * </p>
+ * <p>
+ * We add element in {@code [0]} or {@code [1]}.
+ * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer
+ * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise
+ * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and
+ * so on...
+ * </p>
+ */
+public final class ApproximateHistogram implements Histogram {
+  @VisibleForTesting
+  public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000);
+  @VisibleForTesting
+  public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB);
+  @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long
+
+  // See above
+  @VisibleForTesting long[][] buffer;
+  @VisibleForTesting long count = 0L;
+  @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves
+  @VisibleForTesting int currentTop = 1;
+  @VisibleForTesting int[] indices; // member for optimization reason
+  private boolean leavesSorted = true;
+  private int rootWeight = 1;
+  private long[][] bufferPool; // pool of 2 buffers (used for merging)
+  private int bufferSize;
+  private int maxDepth;
+
+  /**
+   * Private init method that is called only by constructors.
+   * All allocations are done in this method.
+   *
+   * @param bufSize size of each buffer
+   * @param depth maximum depth of the tree of buffers
+   */
+  @VisibleForTesting
+  void init(int bufSize, int depth) {
+    bufferSize = bufSize;
+    maxDepth = depth;
+    bufferPool = new long[2][bufferSize];
+    indices = new int[depth + 1];
+    buffer = new long[depth + 1][bufferSize];
+    // only allocate the first 2 buffers, lazily allocate the others.
+    allocate(0);
+    allocate(1);
+    Arrays.fill(buffer, 2, buffer.length, null);
+    clear();
+  }
+
+  @VisibleForTesting
+  ApproximateHistogram(int bufSize, int depth) {
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with precision constraint, it will allocated as much memory as require to match
+   * this precision constraint.
+   * @param precision the requested precision
+   */
+  public ApproximateHistogram(Precision precision) {
+    Preconditions.checkNotNull(precision);
+    int depth = computeDepth(precision.getEpsilon(), precision.getN());
+    int bufSize = computeBufferSize(depth, precision.getN());
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with memory constraint, it will find the best possible precision that satisfied
+   * the memory constraint.
+   * @param maxMemory the maximum amount of memory that the instance will take
+   */
+  public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) {
+    Preconditions.checkNotNull(maxMemory);
+    Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES),
+        "at least 1KB is required for an Histogram");
+
+    double epsilon = DEFAULT_PRECISION.getEpsilon();
+    int n = expectedSize;
+    int depth = computeDepth(epsilon, n);
+    int bufSize = computeBufferSize(depth, n);
+    long maxBytes = maxMemory.as(Data.BYTES);
+
+    // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps)
+    boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes;
+    double multiplier = tooMuchMem ? 1.05 : 0.95;
+    while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) {
+      epsilon *= multiplier;
+      if (epsilon < 0.00001) {
+        // for very high memory constraint increase N as well
+        n *= 10;
+        epsilon = DEFAULT_PRECISION.getEpsilon();
+      }
+      depth = computeDepth(epsilon, n);
+      bufSize = computeBufferSize(depth, n);
+    }
+    if (!tooMuchMem) {
+      // It's ok to consume less memory than the constraint
+      // but we never have to consume more!
+      depth = computeDepth(epsilon / multiplier, n);
+      bufSize = computeBufferSize(depth, n);
+    }
+
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with memory constraint.
+   * @see #ApproximateHistogram(Amount, int)
+   */
+  public ApproximateHistogram(Amount<Long, Data> maxMemory) {
+    this(maxMemory, DEFAULT_PRECISION.getN());
+  }
+
+  /**
+   * Default Constructor.
+   * @see #ApproximateHistogram(Amount)
+   */
+  public ApproximateHistogram() {
+    this(DEFAULT_MAX_MEMORY);
+  }
+
+  @Override
+  public synchronized void add(long x) {
+    // if the leaves of the tree are full, "collapse" recursively the tree
+    if (leafCount == 2 * bufferSize) {
+      Arrays.sort(buffer[0]);
+      Arrays.sort(buffer[1]);
+      recCollapse(buffer[0], 1);
+      leafCount = 0;
+    }
+
+    // Now we're sure there is space for adding x
+    if (leafCount < bufferSize) {
+      buffer[0][leafCount] = x;
+    } else {
+      buffer[1][leafCount - bufferSize] = x;
+    }
+    leafCount++;
+    count++;
+    leavesSorted = (leafCount == 1);
+  }
+
+  @Override
+  public synchronized long getQuantile(double q) {
+    Preconditions.checkArgument(0.0 <= q && q <= 1.0,
+        "quantile must be in the range 0.0 to 1.0 inclusive");
+    if (count == 0) {
+      return 0L;
+    }
+
+    // the two leaves are the only buffer that can be partially filled
+    int buf0Size = Math.min(bufferSize, leafCount);
+    int buf1Size = Math.max(0, leafCount - buf0Size);
+    long sum = 0;
+    long target = (long) Math.ceil(count * (1.0 - q));
+    int i;
+
+    if (! leavesSorted) {
+      Arrays.sort(buffer[0], 0, buf0Size);
+      Arrays.sort(buffer[1], 0, buf1Size);
+      leavesSorted = true;
+    }
+    Arrays.fill(indices, bufferSize - 1);
+    indices[0] = buf0Size - 1;
+    indices[1] = buf1Size - 1;
+
+    do {
+      i = biggest(indices);
+      indices[i]--;
+      sum += weight(i);
+    } while (sum < target);
+    return buffer[i][indices[i] + 1];
+  }
+
+  @Override
+  public synchronized long[] getQuantiles(double[] quantiles) {
+    return Histograms.extractQuantiles(this, quantiles);
+  }
+
+  @Override
+  public synchronized void clear() {
+    count = 0L;
+    leafCount = 0;
+    currentTop = 1;
+    rootWeight = 1;
+    leavesSorted = true;
+  }
+
+  /**
+   * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the
+   * underlying histograms as it was just one.
+   * Note: Should only be used for querying the underlying histograms.
+   */
+  private static class MergedHistogram implements Histogram {
+    private final ApproximateHistogram[] histograms;
+
+    private MergedHistogram(ApproximateHistogram[] histograms) {
+      this.histograms = histograms;
+    }
+
+    @Override
+    public void add(long x) {
+      /* Ignore, Shouldn't be used */
+      assert(false);
+    }
+
+    @Override
+    public void clear() {
+      /* Ignore, Shouldn't be used */
+      assert(false);
+    }
+
+    @Override
+    public long getQuantile(double quantile) {
+      Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0,
+          "quantile must be in the range 0.0 to 1.0 inclusive");
+
+      long count = initIndices();
+      if (count == 0) {
+        return 0L;
+      }
+
+      long sum = 0;
+      long target = (long) Math.ceil(count * (1.0 - quantile));
+      int iHist = -1;
+      int iBiggest = -1;
+      do {
+        long biggest = Long.MIN_VALUE;
+        for (int i = 0; i < histograms.length; i++) {
+          ApproximateHistogram hist = histograms[i];
+          int indexBiggest = hist.biggest(hist.indices);
+          if (indexBiggest >= 0) {
+            long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]];
+            if (iBiggest == -1 || biggest <= value) {
+              iBiggest = indexBiggest;
+              biggest = value;
+              iHist = i;
+            }
+          }
+        }
+        histograms[iHist].indices[iBiggest]--;
+        sum += histograms[iHist].weight(iBiggest);
+      } while (sum < target);
+
+      ApproximateHistogram hist = histograms[iHist];
+      int i = hist.indices[iBiggest];
+      return hist.buffer[iBiggest][i + 1];
+    }
+
+    @Override
+    public synchronized long[] getQuantiles(double[] quantiles) {
+      return Histograms.extractQuantiles(this, quantiles);
+    }
+
+    /**
+     * Initialize the indices array for each Histogram and return the global count.
+     */
+    private long initIndices() {
+      long count = 0L;
+      for (int i = 0; i < histograms.length; i++) {
+        ApproximateHistogram h = histograms[i];
+        int[] indices = h.indices;
+        count += h.count;
+        int buf0Size = Math.min(h.bufferSize, h.leafCount);
+        int buf1Size = Math.max(0, h.leafCount - buf0Size);
+
+        if (! h.leavesSorted) {
+          Arrays.sort(h.buffer[0], 0, buf0Size);
+          Arrays.sort(h.buffer[1], 0, buf1Size);
+          h.leavesSorted = true;
+        }
+        Arrays.fill(indices, h.bufferSize - 1);
+        indices[0] = buf0Size - 1;
+        indices[1] = buf1Size - 1;
+      }
+      return count;
+    }
+  }
+
+  /**
+   * Return a MergedHistogram
+   * @param histograms array of histograms to merged together
+   * @return a new Histogram
+   */
+  public static Histogram merge(ApproximateHistogram[] histograms) {
+    return new MergedHistogram(histograms);
+  }
+
+  /**
+   * We compute the "smallest possible b" satisfying two inequalities:
+   *    1)   (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N
+   *    2)   k * (2 ^ (b - 1)) >= N
+   *
+   * For an explanation of these inequalities, please read the Munro-Paterson or
+   * the Manku-Rajagopalan-Linday papers.
+   */
+  @VisibleForTesting static int computeDepth(double epsilon, long n) {
+    int b = 2;
+    while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) {
+      b += 1;
+    }
+    return b;
+  }
+
+  @VisibleForTesting static int computeBufferSize(int depth, long n) {
+    return (int) (n / (1L << (depth - 1)));
+  }
+
+  /**
+   * Return an estimation of the memory used by an instance.
+   * The size is due to:
+   * - a fix cost (76 bytes) for the class + fields
+   * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE)
+   * - indices: 16 + sizeof(Integer) * (depth + 1)
+   * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE)
+   *
+   * Note: This method is tested with unit test, it will break if you had new fields.
+   * @param bufferSize the size of a buffer
+   * @param depth the depth of the tree of buffer (depth + 1 buffers)
+   */
+  @VisibleForTesting
+  static long memoryUsage(int bufferSize, int depth) {
+    return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3));
+  }
+
+  /**
+   * Return the level of the biggest element (using the indices array 'ids'
+   * to track which elements have been already returned). Every buffer has
+   * already been sorted at this point.
+   * @return the level of the biggest element or -1 if no element has been found
+   */
+  @VisibleForTesting
+  int biggest(final int[] ids) {
+    long biggest = Long.MIN_VALUE;
+    final int id0 = ids[0], id1 = ids[1];
+    int iBiggest = -1;
+
+    if (0 < leafCount && 0 <= id0) {
+      biggest = buffer[0][id0];
+      iBiggest = 0;
+    }
+    if (bufferSize < leafCount && 0 <= id1) {
+      long x = buffer[1][id1];
+      if (x > biggest) {
+        biggest = x;
+        iBiggest = 1;
+      }
+    }
+    for (int i = 2; i < currentTop + 1; i++) {
+      if (!isBufferEmpty(i) && 0 <= ids[i]) {
+        long x = buffer[i][ids[i]];
+        if (x > biggest) {
+          biggest = x;
+          iBiggest = i;
+        }
+      }
+    }
+    return iBiggest;
+  }
+
+
+  /**
+   * Based on the number of elements inserted we can easily know if a buffer
+   * is empty or not
+   */
+  @VisibleForTesting
+  boolean isBufferEmpty(int level) {
+    if (level == currentTop) {
+      return false; // root buffer (if present) is always full
+    } else {
+      long levelWeight = 1 << (level - 1);
+      return (((count - leafCount) / bufferSize) & levelWeight) == 0;
+    }
+  }
+
+  /**
+   * Return the weight of the level ie. 2^(i-1) except for the two tree
+   * leaves (weight=1) and for the root
+   */
+  private int weight(int level) {
+    if (level == 0) {
+      return 1;
+    } else if (level == maxDepth) {
+      return rootWeight;
+    } else {
+      return 1 << (level - 1);
+    }
+  }
+
+  private void allocate(int i) {
+    if (buffer[i] == null) {
+      buffer[i] = new long[bufferSize];
+    }
+  }
+
+  /**
+   * Recursively collapse the buffers of the tree.
+   * Upper buffers will be allocated on first access in this method.
+   */
+  private void recCollapse(long[] buf, int level) {
+    // if we reach the root, we can't add more buffer
+    if (level == maxDepth) {
+      // weight() return the weight of the root, in that case we need the
+      // weight of merge result
+      int mergeWeight = 1 << (level - 1);
+      int idx = level % 2;
+      long[] merged = bufferPool[idx];
+      long[] tmp = buffer[level];
+      collapse(buf, mergeWeight, buffer[level], rootWeight, merged);
+      buffer[level] = merged;
+      bufferPool[idx] = tmp;
+      rootWeight += mergeWeight;
+    } else {
+      allocate(level + 1); // lazy allocation (if needed)
+      if (level == currentTop) {
+        // if we reach the top, add a new buffer
+        collapse1(buf, buffer[level], buffer[level + 1]);
+        currentTop += 1;
+        rootWeight *= 2;
+      } else if (isBufferEmpty(level + 1)) {
+        // if the upper buffer is empty, use it
+        collapse1(buf, buffer[level], buffer[level + 1]);
+      } else {
+        // it the upper buffer isn't empty, collapse with it
+        long[] merged = bufferPool[level % 2];
+        collapse1(buf, buffer[level], merged);
+        recCollapse(merged, level + 1);
+      }
+    }
+  }
+
+  /**
+   * collapse two sorted Arrays of different weight
+   * ex: [2,5,7] weight 2 and [3,8,9] weight 3
+   *     weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9]
+   *     sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9]
+   *     select every nth elems = [3,7,9]  (n = sum weight / 2)
+   */
+  @VisibleForTesting
+  static void collapse(
+    long[] left,
+    int leftWeight,
+    long[] right,
+    int rightWeight,
+    long[] output) {
+
+    int totalWeight = leftWeight + rightWeight;
+    int halfTotalWeight = (totalWeight / 2) - 1;
+    int i = 0, j = 0, k = 0, cnt = 0;
+
+    int weight;
+    long smallest;
+
+    while (i < left.length || j < right.length) {
+      if (i < left.length && (j == right.length || left[i] < right[j])) {
+        smallest = left[i];
+        weight = leftWeight;
+        i++;
+      } else {
+        smallest = right[j];
+        weight = rightWeight;
+        j++;
+      }
+
+      int cur = (cnt + halfTotalWeight) / totalWeight;
+      cnt += weight;
+      int next = (cnt + halfTotalWeight) / totalWeight;
+
+      for(; cur < next; cur++) {
+        output[k] = smallest;
+        k++;
+      }
+    }
+  }
+
+/**
+ * Optimized version of collapse for collapsing two array of the same weight
+ * (which is what we want most of the time)
+ */
+  private static void collapse1(
+    long[] left,
+    long[] right,
+    long[] output) {
+
+    int i = 0, j = 0, k = 0, cnt = 0;
+    long smallest;
+
+    while (i < left.length || j < right.length) {
+      if (i < left.length && (j == right.length || left[i] < right[j])) {
+        smallest = left[i];
+        i++;
+      } else {
+        smallest = right[j];
+        j++;
+      }
+      if (cnt % 2 == 1) {
+        output[k] = smallest;
+        k++;
+      }
+      cnt++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
new file mode 100644
index 0000000..024e67b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * A map from a key type to integers.  This simplifies the process of storing counters for multiple
+ * values of the same type.
+ */
+public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable {
+  private final Map<K, Integer> map = Maps.newHashMap();
+
+  private static Logger log = Logger.getLogger(CounterMap.class.getName());
+
+  /**
+   * Increments the counter value associated with {@code key}, and returns the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  public int incrementAndGet(K key) {
+    return incrementAndGet(key, 1);
+  }
+
+  /**
+   * Increments the value associated with {@code key} by {@code value}, returning the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  public int incrementAndGet(K key, int count) {
+    Integer value = map.get(key);
+    if (value == null) {
+      value = 0;
+    }
+    int newValue = count + value;
+    map.put(key, newValue);
+    return newValue;
+  }
+
+  /**
+   * Gets the value associated with a key.
+   *
+   * @param key The key to look up.
+   * @return The counter value stored for {@code key}, or 0 if no mapping exists.
+   */
+  public int get(K key) {
+    if (!map.containsKey(key)) {
+      return 0;
+    }
+
+    return map.get(key);
+  }
+
+  /**
+   * Assigns a value to a key.
+   *
+   * @param key The key to assign a value to.
+   * @param newValue The value to assign.
+   */
+  public void set(K key, int newValue) {
+    Preconditions.checkNotNull(key);
+    map.put(key, newValue);
+  }
+
+  /**
+   * Resets the value for {@code key}.  This will remove the key from the counter.
+   *
+   * @param key The key to reset.
+   */
+  public void reset(K key) {
+    map.remove(key);
+  }
+
+  /**
+   * Gets the number of entries stored in the map.
+   *
+   * @return The size of the map.
+   */
+  public int size() {
+    return map.size();
+  }
+
+  /**
+   * Gets an iterator for the mapped values.
+   *
+   * @return Iterator for mapped values.
+   */
+  public Iterator<Map.Entry<K, Integer>> iterator() {
+    return map.entrySet().iterator();
+  }
+
+  public Collection<Integer> values() {
+    return map.values();
+  }
+
+  public Set<K> keySet() {
+    return map.keySet();
+  }
+
+  public String toString() {
+    StringBuilder strVal = new StringBuilder();
+    for (Map.Entry<K, Integer> entry : this) {
+      strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n');
+    }
+    return strVal.toString();
+  }
+
+  public Map<K, Integer> toMap() {
+    return map;
+  }
+
+  @Override
+  public CounterMap<K> clone() {
+    CounterMap<K> newInstance = new CounterMap<K>();
+    newInstance.map.putAll(map);
+    return newInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
new file mode 100644
index 0000000..1e90e85
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Map;
+
+/**
+ * Same as CounterMap<K>, but also keeps track of the item with the highest count.
+ */
+public class CounterMapWithTopKey<K> extends CounterMap<K> {
+
+  private K mostCommonKey = null;
+
+  /**
+   * Updates the most common key, if needed.
+   *
+   * @param key The key to check.
+   * @param count The count for the key.
+   * @return The count.
+   */
+  private int updateMostCommon(K key, int count) {
+    if (count > get(mostCommonKey)) {
+      mostCommonKey = key;
+    }
+    return count;
+  }
+
+  /**
+   * Increments the counter value associated with {@code key}, and returns the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  @Override
+  public int incrementAndGet(K key) {
+    return updateMostCommon(key, super.incrementAndGet(key));
+  }
+
+  /**
+   * Assigns a value to a key.
+   *
+   * @param key The key to assign a value to.
+   * @param newValue The value to assign.
+   */
+  @Override
+  public void set(K key, int newValue) {
+    super.set(key, updateMostCommon(key, newValue));
+  }
+
+  /**
+   * Resets the value for {@code key}.  This will simply set the stored value to 0.
+   * The most common key is updated by scanning the entire map.
+   *
+   * @param key The key to reset.
+   */
+  @Override
+  public void reset(K key) {
+    super.reset(key);
+    for (Map.Entry<K, Integer> entry : this) {
+      updateMostCommon(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   *
+   * @return The key with the highest count in the map. If multiple keys have this count, return
+   * an arbitrary one.
+   */
+  public K getMostCommonKey() {
+    return mostCommonKey;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder(super.toString()).append(String.format("Most common key: %s\n",
+        mostCommonKey.toString())).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
new file mode 100644
index 0000000..859ca7e
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A stat that exports the amount of time since it was last reset.
+ *
+ * @author William Farner
+ */
+public class Elapsed {
+
+  private final Ticker ticker;
+  private final AtomicLong lastEventNs = new AtomicLong();
+
+  /**
+   * Calls {@link #Elapsed(String, Time)} using a default granularity of nanoseconds.
+   *
+   * @param name Name of the stat to export.
+   */
+  public Elapsed(String name) {
+    this(name, Time.NANOSECONDS);
+  }
+
+  /**
+   * Equivalent to calling {@link #Elapsed(String, Time, Ticker)} passing {@code name},
+   * {@code granularity} and {@link com.google.common.base.Ticker#systemTicker()}.
+   * <br/>
+   * @param name Name of the stat to export.
+   * @param granularity Time unit granularity to export.
+   */
+  public Elapsed(String name, Time granularity) {
+    this(name, granularity, Ticker.systemTicker());
+  }
+
+   /**
+    * Creates and exports a new stat that maintains the difference between the tick time
+    * and the time since it was last reset.  Upon export, the counter will act as though it were just
+    * reset.
+    * <br/>
+    * @param name Name of stat to export
+    * @param granularity Time unit granularity to export.
+    * @param ticker Ticker implementation
+    */
+  public Elapsed(String name, final Time granularity, final Ticker ticker) {
+    MorePreconditions.checkNotBlank(name);
+    Preconditions.checkNotNull(granularity);
+    this.ticker = Preconditions.checkNotNull(ticker);
+
+    reset();
+
+    Stats.export(new StatImpl<Long>(name) {
+      @Override public Long read() {
+        return Amount.of(ticker.read() - lastEventNs.get(), Time.NANOSECONDS).as(granularity);
+      }
+    });
+  }
+
+  public void reset() {
+    lastEventNs.set(ticker.read());
+  }
+}