You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:12 UTC
[22/51] [partial] aurora git commit: Move packages from
com.twitter.common to org.apache.aurora.common
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
new file mode 100644
index 0000000..93f6610
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+
+/**
+ * A host set that can be monitored for changes.
+ *
+ * @param <T> The type that is used to identify members of the host set.
+ */
+public interface DynamicHostSet<T> {
+
+ /**
+ * Registers a monitor to receive change notices for this server set as long as this jvm process
+ * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
+ * The monitor will be notified if the membership set or parameters of existing members have
+ * changed.
+ *
+ * @param monitor the server set monitor to call back when the host set changes
+ * @throws MonitorException if there is a problem monitoring the host set
+ * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)}
+ */
+ @Deprecated
+ public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+ /**
+ * Registers a monitor to receive change notices for this server set as long as this jvm process
+ * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
+ * The monitor will be notified if the membership set or parameters of existing members have
+ * changed.
+ *
+ * @param monitor the server set monitor to call back when the host set changes
+ * @return A command which, when executed, will stop monitoring the host set.
+ * @throws MonitorException if there is a problem monitoring the host set
+ */
+ public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+ /**
+ * An interface to an object that is interested in receiving notification whenever the host set
+ * changes.
+ */
+ public static interface HostChangeMonitor<T> {
+
+ /**
+ * Called when either the available set of services changes (when a service dies or a new
+ * instance comes on-line) or when an existing service advertises a status or health change.
+ *
+ * @param hostSet the current set of available ServiceInstances
+ */
+ void onChange(ImmutableSet<T> hostSet);
+ }
+
+ public static class MonitorException extends Exception {
+ public MonitorException(String msg) {
+ super(msg);
+ }
+
+ public MonitorException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
new file mode 100644
index 0000000..4f75893
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+
+/**
+ * Utility methods for dealing with dynamic sets of hosts.
+ */
+public final class DynamicHostSetUtil {
+
+ /**
+ * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of
+ * the underlying actual endpoints.
+ *
+ * @param hostSet The hostSet to snapshot.
+ * @throws DynamicHostSet.MonitorException if there was a problem obtaining the snapshot.
+ */
+ public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws DynamicHostSet.MonitorException {
+ final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder();
+ Command unwatch = hostSet.watch(new DynamicHostSet.HostChangeMonitor<T>() {
+ @Override public void onChange(ImmutableSet<T> hostSet) {
+ snapshot.addAll(hostSet);
+ }
+ });
+ unwatch.execute();
+ return snapshot.build();
+ }
+
+ private DynamicHostSetUtil() {
+ // utility
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
new file mode 100644
index 0000000..2fd6046
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ServerSet;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a
+ * {@link ServerSet}.
+ *
+ * @param <H> The type that contains metadata information about hosts, such as liveness and address.
+ * @param <T> The raw connection type that is being pooled.
+ * @param <E> The type that identifies the endpoint of the pool, such as an address.
+ * @author John Sirois
+ */
+public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> {
+
+ private final MetaPool<T, E> pool;
+
+ /**
+ * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools
+ * for the given {@code serverSet}.
+ *
+ * @param hostSet the dynamic set of available servers to pool connections for
+ * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint
+ * @param loadBalancer Load balancer to manage request flow.
+ * @param onBackendsChosen A callback to notify of chosen backends.
+ * @param restoreInterval the interval after connection errors start occurring for a target to
+ * begin checking to see if it has come back to a healthy state
+ * @param endpointExtractor Function that transforms a service instance into an endpoint instance.
+ * @param livenessChecker Filter that will determine whether a host indicates itself as available.
+ * @throws DynamicHostSet.MonitorException if there is a problem monitoring the host set
+ */
+ public DynamicPool(DynamicHostSet<H> hostSet,
+ Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory,
+ LoadBalancer<E> loadBalancer,
+ Closure<Collection<E>> onBackendsChosen,
+ Amount<Long, Time> restoreInterval,
+ Function<H, E> endpointExtractor,
+ Predicate<H> livenessChecker)
+ throws DynamicHostSet.MonitorException {
+ Preconditions.checkNotNull(hostSet);
+ Preconditions.checkNotNull(endpointPoolFactory);
+
+ pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval);
+
+ // TODO(John Sirois): consider an explicit start/stop
+ hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor,
+ livenessChecker) {
+ @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+ Map<E, ObjectPool<Connection<T, E>>> livePools) {
+ poolRebuilt(deadPools, livePools);
+ }
+ });
+ }
+
+ @VisibleForTesting
+ void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+ Map<E, ObjectPool<Connection<T, E>>> livePools) {
+
+ pool.setBackends(livePools);
+
+ for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) {
+ deadTargetPool.close();
+ }
+ }
+
+ @Override
+ public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+ return pool.get();
+ }
+
+ @Override
+ public Connection<T, E> get(Amount<Long, Time> timeout)
+ throws ResourceExhaustedException, TimeoutException {
+ return pool.get(timeout);
+ }
+
+ @Override
+ public void release(Connection<T, E> connection) {
+ pool.release(connection);
+ }
+
+ @Override
+ public void remove(Connection<T, E> connection) {
+ pool.remove(connection);
+ }
+
+ @Override
+ public void close() {
+ pool.close();
+ }
+
+ private abstract class PoolMonitor<H, S extends Connection<?, ?>>
+ implements DynamicHostSet.HostChangeMonitor<H> {
+
+ private final Function<E, ObjectPool<S>> endpointPoolFactory;
+ private final Function<H, E> endpointExtractor;
+ private final Predicate<H> livenessTest;
+
+ public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory,
+ Function<H, E> endpointExtractor,
+ Predicate<H> livenessTest) {
+ this.endpointPoolFactory = endpointPoolFactory;
+ this.endpointExtractor = endpointExtractor;
+ this.livenessTest = livenessTest;
+ }
+
+ private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap();
+
+ @Override
+ public synchronized void onChange(ImmutableSet<H> serverSet) {
+ // TODO(John Sirois): change onChange to pass the delta data since its already computed by
+ // ServerSet
+
+ Map<E, H> newEndpoints =
+ Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor);
+
+ Set<E> deadEndpoints = ImmutableSet.copyOf(
+ Sets.difference(endpointPools.keySet(), newEndpoints.keySet()));
+ Set<ObjectPool<S>> deadPools = Sets.newHashSet();
+ for (E endpoint : deadEndpoints) {
+ ObjectPool<S> deadPool = endpointPools.remove(endpoint);
+ deadPools.add(deadPool);
+ }
+
+ Set<E> addedEndpoints = ImmutableSet.copyOf(
+ Sets.difference(newEndpoints.keySet(), endpointPools.keySet()));
+ for (E endpoint : addedEndpoints) {
+ ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint);
+ endpointPools.put(endpoint, endpointPool);
+ }
+
+ onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools));
+ }
+
+ protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools,
+ Map<E, ObjectPool<S>> livePools);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
new file mode 100644
index 0000000..df1bd96
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
+import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A connection pool that picks connections from a set of backend pools. Backend pools are selected
+ * from randomly initially but then as they are used they are ranked according to how many
+ * connections they have available and whether or not the last used connection had an error or not.
+ * In this way, backends that are responsive should get selected in preference to those that are
+ * not.
+ *
+ * <p>Non-responsive backends are monitored after a configurable period in a background thread and
+ * if a connection can be obtained they start to float back up in the rankings. In this way,
+ * backends that are initially non-responsive but later become responsive should end up getting
+ * selected.
+ *
+ * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
+ *
+ * @author John Sirois
+ */
+public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> {
+
+ private final Command stopBackendRestorer;
+
+ private Map<E, ObjectPool<Connection<T, E>>> backends = null;
+
+ // Locks to guard mutation of the backends set.
+ private final Lock backendsReadLock;
+ private final Lock backendsWriteLock;
+
+ private final Closure<Collection<E>> onBackendsChosen;
+
+ private final LoadBalancer<E> loadBalancer;
+
+ /**
+ * Creates a connection pool with no backends. Backends may be added post-creation by calling
+ * {@link #setBackends(java.util.Map)}
+ *
+ * @param loadBalancer the load balancer to distribute requests among backends.
+ * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+ * set of backends to restrict its call distribution to
+ * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+ * see if it has come back to a healthy state
+ */
+ public MetaPool(LoadBalancer<E> loadBalancer,
+ Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+ this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer,
+ onBackendsChosen, restoreInterval);
+ }
+
+ /**
+ * Creates a connection pool that balances connections across multiple backend pools.
+ *
+ * @param backends the connection pools for the backends
+ * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+ * set of backends to restrict its call distribution to
+ * @param loadBalancer the load balancer to distribute requests among backends.
+ * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+ * see if it has come back to a healthy state
+ */
+ public MetaPool(
+ ImmutableMap<E, ObjectPool<Connection<T, E>>> backends,
+ LoadBalancer<E> loadBalancer,
+ Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+
+ this.loadBalancer = Preconditions.checkNotNull(loadBalancer);
+ this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen);
+
+ ReadWriteLock backendsLock = new ReentrantReadWriteLock(true);
+ backendsReadLock = backendsLock.readLock();
+ backendsWriteLock = backendsLock.writeLock();
+
+ setBackends(backends);
+
+ Preconditions.checkNotNull(restoreInterval);
+ Preconditions.checkArgument(restoreInterval.getValue() > 0);
+ stopBackendRestorer = startDeadBackendRestorer(restoreInterval);
+ }
+
+ /**
+ * Assigns the backend pools that this pool should draw from.
+ *
+ * @param pools New pools to use.
+ */
+ public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) {
+ backendsWriteLock.lock();
+ try {
+ backends = Preconditions.checkNotNull(pools);
+ loadBalancer.offerBackends(pools.keySet(), onBackendsChosen);
+ } finally {
+ backendsWriteLock.unlock();
+ }
+ }
+
+ private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) {
+
+ final AtomicBoolean shouldRestore = new AtomicBoolean(true);
+ Runnable restoreDeadBackends = new Runnable() {
+ @Override public void run() {
+ if (shouldRestore.get()) {
+ restoreDeadBackends(restoreInterval);
+ }
+ }
+ };
+ final ScheduledExecutorService scheduledExecutorService =
+ Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("MTCP-DeadBackendRestorer[%s]")
+ .build());
+ long restoreDelay = restoreInterval.getValue();
+ scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay,
+ restoreDelay, restoreInterval.getUnit().getTimeUnit());
+
+ return new Command() {
+ @Override public void execute() {
+ shouldRestore.set(false);
+ scheduledExecutorService.shutdownNow();
+ LOG.info("Backend restorer shut down");
+ }
+ };
+ }
+
+ private static final Logger LOG = Logger.getLogger(MetaPool.class.getName());
+
+ private void restoreDeadBackends(Amount<Long, Time> restoreInterval) {
+ for (E backend : snapshotBackends()) {
+ ObjectPool<Connection<T, E>> pool;
+ backendsReadLock.lock();
+ try {
+ pool = backends.get(backend);
+ } finally {
+ backendsReadLock.unlock();
+ }
+
+ // We can lose a race if the backends change - and that's fine, we'll restore the new set of
+ // backends in the next scheduled restoration run.
+ if (pool != null) {
+ try {
+ release(get(backend, pool, restoreInterval));
+ } catch (TimeoutException e) {
+ LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+ } catch (ResourceExhaustedException e) {
+ LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+ }
+ }
+ }
+ }
+
+ private Iterable<E> snapshotBackends() {
+ backendsReadLock.lock();
+ try {
+ return ImmutableList.copyOf(backends.keySet());
+ } finally {
+ backendsReadLock.unlock();
+ }
+ }
+
+ @Override
+ public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+ return get(ObjectPool.NO_TIMEOUT);
+ }
+
+ @Override
+ public Connection<T, E> get(Amount<Long, Time> timeout)
+ throws ResourceExhaustedException, TimeoutException {
+
+ E backend;
+ ObjectPool<Connection<T, E>> pool;
+
+ backendsReadLock.lock();
+ try {
+ backend = loadBalancer.nextBackend();
+ Preconditions.checkNotNull(backend, "Load balancer gave a null backend.");
+
+ pool = backends.get(backend);
+ Preconditions.checkNotNull(backend,
+ "Given backend %s not found in tracked backends: %s", backend, backends);
+ } finally {
+ backendsReadLock.unlock();
+ }
+
+ return get(backend, pool, timeout);
+ }
+
+ private static class ManagedConnection<T, E> implements Connection<T, E> {
+ private final Connection<T, E> connection;
+ private final ObjectPool<Connection<T, E>> pool;
+
+ private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) {
+ this.connection = connection;
+ this.pool = pool;
+ }
+
+ @Override
+ public void close() {
+ connection.close();
+ }
+
+ @Override
+ public T get() {
+ return connection.get();
+ }
+
+ @Override
+ public boolean isValid() {
+ return connection.isValid();
+ }
+
+ @Override
+ public E getEndpoint() {
+ return connection.getEndpoint();
+ }
+
+ @Override public String toString() {
+ return "ManagedConnection[" + connection.toString() + "]";
+ }
+
+ void release(boolean remove) {
+ if (remove) {
+ pool.remove(connection);
+ } else {
+ pool.release(connection);
+ }
+ }
+ }
+
+ private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool,
+ Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
+
+ long startNanos = System.nanoTime();
+ try {
+ Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout);
+
+ // BEWARE: We have leased a connection from the underlying pool here and must return it to the
+ // caller so they can later release it. If we fail to do so, the connection will leak.
+ // Catching intermediate exceptions ourselves and pro-actively returning the connection to the
+ // pool before re-throwing is not a viable option since the return would have to succeed,
+ // forcing us to ignore the timeout passed in.
+
+ // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire
+ // a (backend, pool) pair atomically that has since been removed, we can safely let the lb
+ // know about backend events and it will just ignore us.
+
+ try {
+ loadBalancer.connected(backend, System.nanoTime() - startNanos);
+ } catch (RuntimeException e) {
+ LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after "
+ + "leasing a connection - continuing", e);
+ }
+ return new ManagedConnection<T, E>(connection, pool);
+ } catch (TimeoutException e) {
+ loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT);
+ throw e;
+ } catch (ResourceExhaustedException e) {
+ loadBalancer.connectFailed(backend, ConnectionResult.FAILED);
+ throw e;
+ }
+ }
+
+ @Override
+ public void release(Connection<T, E> connection) {
+ release(connection, false);
+ }
+
+ /**
+ * Equivalent to releasing a Connection with isValid() == false.
+ * @see ObjectPool#remove(Object)
+ */
+ @Override
+ public void remove(Connection<T, E> connection) {
+ release(connection, true);
+ }
+
+ private void release(Connection<T, E> connection, boolean remove) {
+ backendsWriteLock.lock();
+ try {
+
+ if (!(connection instanceof ManagedConnection)) {
+ throw new IllegalArgumentException("Connection not controlled by this connection pool: "
+ + connection);
+ }
+ ((ManagedConnection) connection).release(remove);
+
+ loadBalancer.released(connection.getEndpoint());
+ } finally {
+ backendsWriteLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ stopBackendRestorer.execute();
+
+ backendsWriteLock.lock();
+ try {
+ for (ObjectPool<Connection<T, E>> backend : backends.values()) {
+ backend.close();
+ }
+ } finally {
+ backendsWriteLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
new file mode 100644
index 0000000..a665903
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A generic object pool that provides object of a given type for exclusive use by the caller.
+ * Object pools generally pool expensive resources and so offer a {@link #close} method that should
+ * be used to free these resources when the pool is no longer needed.
+ *
+ * @author John Sirois
+ */
+public interface ObjectPool<T> {
+
+ /**
+ * Gets a resource potentially blocking for as long as it takes to either create a new one or wait
+ * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release}
+ * the connection when they are done with it.
+ *
+ * @return a resource for exclusive use by the caller
+ * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+ * exhausted
+ * @throws TimeoutException if we timed out while trying to fetch a resource
+ */
+ T get() throws ResourceExhaustedException, TimeoutException;
+
+ /**
+ * A convenience constant representing a no timeout.
+ */
+ Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS);
+
+ /**
+ * Gets a resource; timing out if there are none available and it takes longer than specified to
+ * create a new one or wait for one to be {@link #release(Object) released}. Callers must
+ * {@link #release (Object) release} the connection when they are done with it.
+ *
+ * @param timeout the maximum amount of time to wait
+ * @return a resource for exclusive use by the caller
+ * @throws TimeoutException if the specified timeout was reached before a resource became
+ * available
+ * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+ * exhausted
+ */
+ T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException;
+
+ /**
+ * Releases a resource obtained from this pool back into the pool of available resources. It is an
+ * error to release a resource not obtained from this pool.
+ *
+ * @param resource Resource to release.
+ */
+ void release(T resource);
+
+ /**
+ * Removes a resource obtained from this pool from its available resources. It is an error to
+ * remove a resource not obtained from this pool.
+ *
+ * @param resource Resource to remove.
+ */
+ void remove(T resource);
+
+ /**
+ * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects
+ * when they are released.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
new file mode 100644
index 0000000..fd48ddb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+/**
+ * @author John Sirois
+ */
+public class ResourceExhaustedException extends Exception {
+ public ResourceExhaustedException(String message) {
+ super(message);
+ }
+
+ public ResourceExhaustedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
new file mode 100644
index 0000000..95c8868
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.objectsize;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
+
+/**
+ * Contains utility methods for calculating the memory usage of objects. It
+ * only works on the HotSpot JVM, and infers the actual memory layout (32 bit
+ * vs. 64 bit word size, compressed object pointers vs. uncompressed) from
+ * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM.
+ * It can only make an educated guess at whether compressed OOPs are used,
+ * though; specifically, it knows what the JVM's default choice of OOP
+ * compression would be based on HotSpot version and maximum heap sizes, but if
+ * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line
+ * switch, it can not detect
+ * this fact and will report incorrect sizes, as it will presume the default JVM
+ * behavior.
+ *
+ * @author Attila Szegedi
+ */
+public class ObjectSizeCalculator {
+
+ /**
+ * Describes constant memory overheads for various constructs in a JVM implementation.
+ */
+ public interface MemoryLayoutSpecification {
+
+ /**
+ * Returns the fixed overhead of an array of any type or length in this JVM.
+ *
+ * @return the fixed overhead of an array.
+ */
+ int getArrayHeaderSize();
+
+ /**
+ * Returns the fixed overhead of for any {@link Object} subclass in this JVM.
+ *
+ * @return the fixed overhead of any object.
+ */
+ int getObjectHeaderSize();
+
+ /**
+ * Returns the quantum field size for a field owned by an object in this JVM.
+ *
+ * @return the quantum field size for an object.
+ */
+ int getObjectPadding();
+
+ /**
+ * Returns the fixed size of an object reference in this JVM.
+ *
+ * @return the size of all object references.
+ */
+ int getReferenceSize();
+
+ /**
+ * Returns the quantum field size for a field owned by one of an object's ancestor superclasses
+ * in this JVM.
+ *
+ * @return the quantum field size for a superclass field.
+ */
+ int getSuperclassFieldPadding();
+ }
+
+ private static class CurrentLayout {
+ private static final MemoryLayoutSpecification SPEC =
+ getEffectiveMemoryLayoutSpecification();
+ }
+
+ /**
+ * Given an object, returns the total allocated size, in bytes, of the object
+ * and all other objects reachable from it. Attempts to to detect the current JVM memory layout,
+ * but may fail with {@link UnsupportedOperationException};
+ *
+ * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+ * anything special, it measures the size of all objects
+ * reachable through it (which will include its class loader, and by
+ * extension, all other Class objects loaded by
+ * the same loader, and all the parent class loaders). It doesn't provide the
+ * size of the static fields in the JVM class that the Class object
+ * represents.
+ * @return the total allocated size of the object and all other objects it
+ * retains.
+ * @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
+ */
+ public static long getObjectSize(Object obj) throws UnsupportedOperationException {
+ return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
+ }
+
+ // Fixed object header size for arrays.
+ private final int arrayHeaderSize;
+ // Fixed object header size for non-array objects.
+ private final int objectHeaderSize;
+ // Padding for the object size - if the object size is not an exact multiple
+ // of this, it is padded to the next multiple.
+ private final int objectPadding;
+ // Size of reference (pointer) fields.
+ private final int referenceSize;
+ // Padding for the fields of superclass before fields of subclasses are
+ // added.
+ private final int superclassFieldPadding;
+
+ private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
+ CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
+ public ClassSizeInfo load(Class<?> clazz) {
+ return new ClassSizeInfo(clazz);
+ }
+ });
+
+
+ private final Set<Object> alreadyVisited = Sets.newIdentityHashSet();
+ private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024);
+ private long size;
+
+ /**
+ * Creates an object size calculator that can calculate object sizes for a given
+ * {@code memoryLayoutSpecification}.
+ *
+ * @param memoryLayoutSpecification a description of the JVM memory layout.
+ */
+ public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
+ Preconditions.checkNotNull(memoryLayoutSpecification);
+ arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
+ objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
+ objectPadding = memoryLayoutSpecification.getObjectPadding();
+ referenceSize = memoryLayoutSpecification.getReferenceSize();
+ superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
+ }
+
+ /**
+ * Given an object, returns the total allocated size, in bytes, of the object
+ * and all other objects reachable from it.
+ *
+ * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+ * anything special, it measures the size of all objects
+ * reachable through it (which will include its class loader, and by
+ * extension, all other Class objects loaded by
+ * the same loader, and all the parent class loaders). It doesn't provide the
+ * size of the static fields in the JVM class that the Class object
+ * represents.
+ * @return the total allocated size of the object and all other objects it
+ * retains.
+ */
+ public synchronized long calculateObjectSize(Object obj) {
+ // Breadth-first traversal instead of naive depth-first with recursive
+ // implementation, so we don't blow the stack traversing long linked lists.
+ try {
+ for (;;) {
+ visit(obj);
+ if (pending.isEmpty()) {
+ return size;
+ }
+ obj = pending.removeFirst();
+ }
+ } finally {
+ alreadyVisited.clear();
+ pending.clear();
+ size = 0;
+ }
+ }
+
+ private void visit(Object obj) {
+ if (alreadyVisited.contains(obj)) {
+ return;
+ }
+ final Class<?> clazz = obj.getClass();
+ if (clazz == ArrayElementsVisitor.class) {
+ ((ArrayElementsVisitor) obj).visit(this);
+ } else {
+ alreadyVisited.add(obj);
+ if (clazz.isArray()) {
+ visitArray(obj);
+ } else {
+ classSizeInfos.getUnchecked(clazz).visit(obj, this);
+ }
+ }
+ }
+
+ private void visitArray(Object array) {
+ final Class<?> componentType = array.getClass().getComponentType();
+ final int length = Array.getLength(array);
+ if (componentType.isPrimitive()) {
+ increaseByArraySize(length, getPrimitiveFieldSize(componentType));
+ } else {
+ increaseByArraySize(length, referenceSize);
+ // If we didn't use an ArrayElementsVisitor, we would be enqueueing every
+ // element of the array here instead. For large arrays, it would
+ // tremendously enlarge the queue. In essence, we're compressing it into
+ // a small command object instead. This is different than immediately
+ // visiting the elements, as their visiting is scheduled for the end of
+ // the current queue.
+ switch (length) {
+ case 0: {
+ break;
+ }
+ case 1: {
+ enqueue(Array.get(array, 0));
+ break;
+ }
+ default: {
+ enqueue(new ArrayElementsVisitor((Object[]) array));
+ }
+ }
+ }
+ }
+
+ private void increaseByArraySize(int length, long elementSize) {
+ increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
+ }
+
+ private static class ArrayElementsVisitor {
+ private final Object[] array;
+
+ ArrayElementsVisitor(Object[] array) {
+ this.array = array;
+ }
+
+ public void visit(ObjectSizeCalculator calc) {
+ for (Object elem : array) {
+ if (elem != null) {
+ calc.visit(elem);
+ }
+ }
+ }
+ }
+
+ void enqueue(Object obj) {
+ if (obj != null) {
+ pending.addLast(obj);
+ }
+ }
+
+ void increaseSize(long objectSize) {
+ size += objectSize;
+ }
+
+ @VisibleForTesting
+ static long roundTo(long x, int multiple) {
+ return ((x + multiple - 1) / multiple) * multiple;
+ }
+
+ private class ClassSizeInfo {
+ // Padded fields + header size
+ private final long objectSize;
+ // Only the fields size - used to calculate the subclasses' memory
+ // footprint.
+ private final long fieldsSize;
+ private final Field[] referenceFields;
+
+ public ClassSizeInfo(Class<?> clazz) {
+ long fieldsSize = 0;
+ final List<Field> referenceFields = new LinkedList<Field>();
+ for (Field f : clazz.getDeclaredFields()) {
+ if (Modifier.isStatic(f.getModifiers())) {
+ continue;
+ }
+ final Class<?> type = f.getType();
+ if (type.isPrimitive()) {
+ fieldsSize += getPrimitiveFieldSize(type);
+ } else {
+ f.setAccessible(true);
+ referenceFields.add(f);
+ fieldsSize += referenceSize;
+ }
+ }
+ final Class<?> superClass = clazz.getSuperclass();
+ if (superClass != null) {
+ final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass);
+ fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
+ referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
+ }
+ this.fieldsSize = fieldsSize;
+ this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
+ this.referenceFields = referenceFields.toArray(
+ new Field[referenceFields.size()]);
+ }
+
+ void visit(Object obj, ObjectSizeCalculator calc) {
+ calc.increaseSize(objectSize);
+ enqueueReferencedObjects(obj, calc);
+ }
+
+ public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
+ for (Field f : referenceFields) {
+ try {
+ calc.enqueue(f.get(obj));
+ } catch (IllegalAccessException e) {
+ final AssertionError ae = new AssertionError(
+ "Unexpected denial of access to " + f);
+ ae.initCause(e);
+ throw ae;
+ }
+ }
+ }
+ }
+
+ private static long getPrimitiveFieldSize(Class<?> type) {
+ if (type == boolean.class || type == byte.class) {
+ return 1;
+ }
+ if (type == char.class || type == short.class) {
+ return 2;
+ }
+ if (type == int.class || type == float.class) {
+ return 4;
+ }
+ if (type == long.class || type == double.class) {
+ return 8;
+ }
+ throw new AssertionError("Encountered unexpected primitive type " +
+ type.getName());
+ }
+
+ @VisibleForTesting
+ static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
+ final String vmName = System.getProperty("java.vm.name");
+ if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ")
+ || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) {
+ throw new UnsupportedOperationException(
+ "ObjectSizeCalculator only supported on HotSpot VM");
+ }
+
+ final String dataModel = System.getProperty("sun.arch.data.model");
+ if ("32".equals(dataModel)) {
+ // Running with 32-bit data model
+ return new MemoryLayoutSpecification() {
+ @Override public int getArrayHeaderSize() {
+ return 12;
+ }
+ @Override public int getObjectHeaderSize() {
+ return 8;
+ }
+ @Override public int getObjectPadding() {
+ return 8;
+ }
+ @Override public int getReferenceSize() {
+ return 4;
+ }
+ @Override public int getSuperclassFieldPadding() {
+ return 4;
+ }
+ };
+ } else if (!"64".equals(dataModel)) {
+ throw new UnsupportedOperationException("Unrecognized value '" +
+ dataModel + "' of sun.arch.data.model system property");
+ }
+
+ final String strVmVersion = System.getProperty("java.vm.version");
+ final int vmVersion = Integer.parseInt(strVmVersion.substring(0,
+ strVmVersion.indexOf('.')));
+ if (vmVersion >= 17) {
+ long maxMemory = 0;
+ for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
+ maxMemory += mp.getUsage().getMax();
+ }
+ if (maxMemory < 30L * 1024 * 1024 * 1024) {
+ // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
+ // for all memory pools (yes, including code cache).
+ return new MemoryLayoutSpecification() {
+ @Override public int getArrayHeaderSize() {
+ return 16;
+ }
+ @Override public int getObjectHeaderSize() {
+ return 12;
+ }
+ @Override public int getObjectPadding() {
+ return 8;
+ }
+ @Override public int getReferenceSize() {
+ return 4;
+ }
+ @Override public int getSuperclassFieldPadding() {
+ return 4;
+ }
+ };
+ }
+ }
+
+ // In other cases, it's a 64-bit uncompressed OOPs object model
+ return new MemoryLayoutSpecification() {
+ @Override public int getArrayHeaderSize() {
+ return 24;
+ }
+ @Override public int getObjectHeaderSize() {
+ return 16;
+ }
+ @Override public int getObjectPadding() {
+ return 8;
+ }
+ @Override public int getReferenceSize() {
+ return 8;
+ }
+ @Override public int getSuperclassFieldPadding() {
+ return 8;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
new file mode 100644
index 0000000..11be7f5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.collections.Pair;
+
+/**
+ * Represents a value in a unit system and facilitates unambiguous communication of amounts.
+ * Instances are created via static factory {@code of(...)} methods.
+ *
+ * @param <T> the type of number the amount value is expressed in
+ * @param <U> the type of unit that this amount quantifies
+ *
+ * @author John Sirois
+ */
+public abstract class Amount<T extends Number & Comparable<T>, U extends Unit<U>>
+ implements Comparable<Amount<T, U>> {
+
+ /**
+ * Thrown when a checked operation on an amount would overflow.
+ */
+
+ public static class TypeOverflowException extends RuntimeException {
+ public TypeOverflowException() {
+ super();
+ }
+ }
+
+ private final Pair<T, U> amount;
+ private final T maxValue;
+
+ private Amount(T value, U unit, T maxValue) {
+ Preconditions.checkNotNull(value);
+ Preconditions.checkNotNull(unit);
+ this.maxValue = maxValue;
+ this.amount = Pair.of(value, unit);
+ }
+
+ public T getValue() {
+ return amount.getFirst();
+ }
+
+ public U getUnit() {
+ return amount.getSecond();
+ }
+
+ public T as(U unit) {
+ return asUnit(unit);
+ }
+
+ /**
+ * Throws TypeOverflowException if an overflow occurs during scaling.
+ */
+ public T asChecked(U unit) {
+ T retVal = asUnit(unit);
+ if (retVal.equals(maxValue)) {
+ throw new TypeOverflowException();
+ }
+ return retVal;
+ }
+
+ private T asUnit(Unit<?> unit) {
+ return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier());
+ }
+
+ @Override
+ public int hashCode() {
+ return amount.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Amount)) {
+ return false;
+ }
+
+ Amount<?, ?> other = (Amount<?, ?>) obj;
+ return amount.equals(other.amount) || isSameAmount(other);
+ }
+
+ private boolean isSameAmount(Amount<?, ?> other) {
+ // Equals allows Object - so we have no compile time check that other has the right value type;
+ // ie: make sure they don't have Integer when we have Long.
+ Number value = other.getValue();
+ if (!getValue().getClass().isInstance(value)) {
+ return false;
+ }
+
+ Unit<?> unit = other.getUnit();
+ if (!getUnit().getClass().isInstance(unit)) {
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ U otherUnit = (U) other.getUnit();
+ return isSameAmount(other, otherUnit);
+ }
+
+ private boolean isSameAmount(Amount<?, ?> other, U otherUnit) {
+ // Compare in the more precise unit (the one with the lower multiplier).
+ if (otherUnit.multiplier() > getUnit().multiplier()) {
+ return getValue().equals(other.asUnit(getUnit()));
+ } else {
+ return as(otherUnit).equals(other.getValue());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return amount.toString();
+ }
+
+ @Override
+ public int compareTo(Amount<T, U> other) {
+ // Compare in the more precise unit (the one with the lower multiplier).
+ if (other.getUnit().multiplier() > getUnit().multiplier()) {
+ return getValue().compareTo(other.as(getUnit()));
+ } else {
+ return as(other.getUnit()).compareTo(other.getValue());
+ }
+ }
+
+ private boolean sameUnits(Unit<? extends Unit<?>> unit) {
+ return getUnit().equals(unit);
+ }
+
+ protected abstract T scale(double multiplier);
+
+ /**
+ * Creates an amount that uses a {@code double} value.
+ *
+ * @param number the number of units the returned amount should quantify
+ * @param unit the unit the returned amount is expressed in terms of
+ * @param <U> the type of unit that the returned amount quantifies
+ * @return an amount quantifying the given {@code number} of {@code unit}s
+ */
+ public static <U extends Unit<U>> Amount<Double, U> of(double number, U unit) {
+ return new Amount<Double, U>(number, unit, Double.MAX_VALUE) {
+ @Override protected Double scale(double multiplier) {
+ return getValue() * multiplier;
+ }
+ };
+ }
+
+ /**
+ * Creates an amount that uses a {@code float} value.
+ *
+ * @param number the number of units the returned amount should quantify
+ * @param unit the unit the returned amount is expressed in terms of
+ * @param <U> the type of unit that the returned amount quantifies
+ * @return an amount quantifying the given {@code number} of {@code unit}s
+ */
+ public static <U extends Unit<U>> Amount<Float, U> of(float number, U unit) {
+ return new Amount<Float, U>(number, unit, Float.MAX_VALUE) {
+ @Override protected Float scale(double multiplier) {
+ return (float) (getValue() * multiplier);
+ }
+ };
+ }
+
+ /**
+ * Creates an amount that uses a {@code long} value.
+ *
+ * @param number the number of units the returned amount should quantify
+ * @param unit the unit the returned amount is expressed in terms of
+ * @param <U> the type of unit that the returned amount quantifies
+ * @return an amount quantifying the given {@code number} of {@code unit}s
+ */
+ public static <U extends Unit<U>> Amount<Long, U> of(long number, U unit) {
+ return new Amount<Long, U>(number, unit, Long.MAX_VALUE) {
+ @Override protected Long scale(double multiplier) {
+ return (long) (getValue() * multiplier);
+ }
+ };
+ }
+
+ /**
+ * Creates an amount that uses an {@code int} value.
+ *
+ * @param number the number of units the returned amount should quantify
+ * @param unit the unit the returned amount is expressed in terms of
+ * @param <U> the type of unit that the returned amount quantifies
+ * @return an amount quantifying the given {@code number} of {@code unit}s
+ */
+ public static <U extends Unit<U>> Amount<Integer, U> of(int number, U unit) {
+ return new Amount<Integer, U>(number, unit, Integer.MAX_VALUE) {
+ @Override protected Integer scale(double multiplier) {
+ return (int) (getValue() * multiplier);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Data.java b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
new file mode 100644
index 0000000..80d077b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s.
+ * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor
+ * of 1024 instead of 1000 as typical in metric units. Additionally, units are divided in 2
+ * hierarchies one based on bits and the other on bytes. Thus {@link #Kb} represents kilobits; so
+ * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits.
+ *
+ * @author John Sirois
+ */
+public enum Data implements Unit<Data> {
+ BITS(1),
+ Kb(1024, BITS),
+ Mb(1024, Kb),
+ Gb(1024, Mb),
+ BYTES(8, BITS),
+ KB(1024, BYTES),
+ MB(1024, KB),
+ GB(1024, MB),
+ TB(1024, GB),
+ PB(1024, TB);
+
+ private final double multiplier;
+
+ private Data(double multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ private Data(double multiplier, Data base) {
+ this(multiplier * base.multiplier);
+ }
+
+ @Override
+ public double multiplier() {
+ return multiplier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Time.java b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
new file mode 100644
index 0000000..ebf77eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s.
+ *
+ * @author John Sirois
+ */
+public enum Time implements Unit<Time> {
+ NANOSECONDS(1, TimeUnit.NANOSECONDS, "ns"),
+ MICROSECONDS(1000, NANOSECONDS, TimeUnit.MICROSECONDS, "us"),
+ MILLISECONDS(1000, MICROSECONDS, TimeUnit.MILLISECONDS, "ms"),
+ SECONDS(1000, MILLISECONDS, TimeUnit.SECONDS, "secs"),
+ MINUTES(60, SECONDS, TimeUnit.MINUTES, "mins"),
+ HOURS(60, MINUTES, TimeUnit.HOURS, "hrs"),
+ DAYS(24, HOURS, TimeUnit.DAYS, "days");
+
+ private final double multiplier;
+ private final TimeUnit timeUnit;
+ private final String display;
+
+ private Time(double multiplier, TimeUnit timeUnit, String display) {
+ this.multiplier = multiplier;
+ this.timeUnit = timeUnit;
+ this.display = display;
+ }
+
+ private Time(double multiplier, Time base, TimeUnit timeUnit, String display) {
+ this(multiplier * base.multiplier, timeUnit, display);
+ }
+
+ @Override
+ public double multiplier() {
+ return multiplier;
+ }
+
+ /**
+ * Returns the equivalent {@code TimeUnit}.
+ */
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ @Override
+ public String toString() {
+ return display;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
new file mode 100644
index 0000000..dd9b9ec
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.quantity;
+
+/**
+ * Represents a unit hierarchy for a given unit of measure; eg: time. Instances represent specific
+ * units from the hierarchy; eg: seconds.
+ *
+ * @param <U> the type of the concrete unit implementation
+ *
+ * @author John Sirois
+ */
+public interface Unit<U extends Unit<U>> {
+
+ /**
+ * Returns the weight of this unit relative to other units in the same hierarchy. Typically the
+ * smallest unit in the hierarchy returns 1, but this need not be the case. It is only required
+ * that each unit of the hierarchy return a multiplier relative to a common base unit for the
+ * hierarchy.
+ */
+ double multiplier();
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
new file mode 100644
index 0000000..cfbf04e
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+
+/**
+ * <p>
+ * Implements Histogram structure for computing approximate quantiles.
+ * The implementation is based on the following paper:
+ *
+ * <pre>
+ * [MP80] Munro & Paterson, "Selection and Sorting with Limited Storage",
+ * Theoretical Computer Science, Vol 12, p 315-323, 1980.
+ * </pre>
+ * </p>
+ * <p>
+ * You could read a detailed description of the same algorithm here:
+ *
+ * <pre>
+ * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
+ * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM
+ * SIGMOD, Vol 27, No 2, p 426-435, June 1998.
+ * </pre>
+ * </p>
+ * <p>
+ * There's a good explanation of the algorithm in the Sawzall source code
+ * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc
+ * </p>
+ * Here's a schema of the tree:
+ * <pre>
+ * [4] level 3, weight=rootWeight=8
+ * |
+ * [3] level 2, weight=4
+ * |
+ * [2] level 1, weight=2
+ * / \
+ * [0] [1] level 0, weight=1
+ * </pre>
+ * <p>
+ * {@code [i]} represents {@code buffer[i]}
+ * The depth of the tree is limited to a maximum value
+ * Every buffer has the same size
+ * </p>
+ * <p>
+ * We add element in {@code [0]} or {@code [1]}.
+ * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer
+ * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise
+ * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and
+ * so on...
+ * </p>
+ */
+public final class ApproximateHistogram implements Histogram {
+ @VisibleForTesting
+ public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000);
+ @VisibleForTesting
+ public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB);
+ @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long
+
+ // See above
+ @VisibleForTesting long[][] buffer;
+ @VisibleForTesting long count = 0L;
+ @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves
+ @VisibleForTesting int currentTop = 1;
+ @VisibleForTesting int[] indices; // member for optimization reason
+ private boolean leavesSorted = true;
+ private int rootWeight = 1;
+ private long[][] bufferPool; // pool of 2 buffers (used for merging)
+ private int bufferSize;
+ private int maxDepth;
+
+ /**
+ * Private init method that is called only by constructors.
+ * All allocations are done in this method.
+ *
+ * @param bufSize size of each buffer
+ * @param depth maximum depth of the tree of buffers
+ */
+ @VisibleForTesting
+ void init(int bufSize, int depth) {
+ bufferSize = bufSize;
+ maxDepth = depth;
+ bufferPool = new long[2][bufferSize];
+ indices = new int[depth + 1];
+ buffer = new long[depth + 1][bufferSize];
+ // only allocate the first 2 buffers, lazily allocate the others.
+ allocate(0);
+ allocate(1);
+ Arrays.fill(buffer, 2, buffer.length, null);
+ clear();
+ }
+
+ @VisibleForTesting
+ ApproximateHistogram(int bufSize, int depth) {
+ init(bufSize, depth);
+ }
+
+ /**
+ * Constructor with precision constraint, it will allocated as much memory as require to match
+ * this precision constraint.
+ * @param precision the requested precision
+ */
+ public ApproximateHistogram(Precision precision) {
+ Preconditions.checkNotNull(precision);
+ int depth = computeDepth(precision.getEpsilon(), precision.getN());
+ int bufSize = computeBufferSize(depth, precision.getN());
+ init(bufSize, depth);
+ }
+
+ /**
+ * Constructor with memory constraint, it will find the best possible precision that satisfied
+ * the memory constraint.
+ * @param maxMemory the maximum amount of memory that the instance will take
+ */
+ public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) {
+ Preconditions.checkNotNull(maxMemory);
+ Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES),
+ "at least 1KB is required for an Histogram");
+
+ double epsilon = DEFAULT_PRECISION.getEpsilon();
+ int n = expectedSize;
+ int depth = computeDepth(epsilon, n);
+ int bufSize = computeBufferSize(depth, n);
+ long maxBytes = maxMemory.as(Data.BYTES);
+
+ // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps)
+ boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes;
+ double multiplier = tooMuchMem ? 1.05 : 0.95;
+ while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) {
+ epsilon *= multiplier;
+ if (epsilon < 0.00001) {
+ // for very high memory constraint increase N as well
+ n *= 10;
+ epsilon = DEFAULT_PRECISION.getEpsilon();
+ }
+ depth = computeDepth(epsilon, n);
+ bufSize = computeBufferSize(depth, n);
+ }
+ if (!tooMuchMem) {
+ // It's ok to consume less memory than the constraint
+ // but we never have to consume more!
+ depth = computeDepth(epsilon / multiplier, n);
+ bufSize = computeBufferSize(depth, n);
+ }
+
+ init(bufSize, depth);
+ }
+
+ /**
+ * Constructor with memory constraint.
+ * @see #ApproximateHistogram(Amount, int)
+ */
+ public ApproximateHistogram(Amount<Long, Data> maxMemory) {
+ this(maxMemory, DEFAULT_PRECISION.getN());
+ }
+
+ /**
+ * Default Constructor.
+ * @see #ApproximateHistogram(Amount)
+ */
+ public ApproximateHistogram() {
+ this(DEFAULT_MAX_MEMORY);
+ }
+
+ @Override
+ public synchronized void add(long x) {
+ // if the leaves of the tree are full, "collapse" recursively the tree
+ if (leafCount == 2 * bufferSize) {
+ Arrays.sort(buffer[0]);
+ Arrays.sort(buffer[1]);
+ recCollapse(buffer[0], 1);
+ leafCount = 0;
+ }
+
+ // Now we're sure there is space for adding x
+ if (leafCount < bufferSize) {
+ buffer[0][leafCount] = x;
+ } else {
+ buffer[1][leafCount - bufferSize] = x;
+ }
+ leafCount++;
+ count++;
+ leavesSorted = (leafCount == 1);
+ }
+
+ @Override
+ public synchronized long getQuantile(double q) {
+ Preconditions.checkArgument(0.0 <= q && q <= 1.0,
+ "quantile must be in the range 0.0 to 1.0 inclusive");
+ if (count == 0) {
+ return 0L;
+ }
+
+ // the two leaves are the only buffer that can be partially filled
+ int buf0Size = Math.min(bufferSize, leafCount);
+ int buf1Size = Math.max(0, leafCount - buf0Size);
+ long sum = 0;
+ long target = (long) Math.ceil(count * (1.0 - q));
+ int i;
+
+ if (! leavesSorted) {
+ Arrays.sort(buffer[0], 0, buf0Size);
+ Arrays.sort(buffer[1], 0, buf1Size);
+ leavesSorted = true;
+ }
+ Arrays.fill(indices, bufferSize - 1);
+ indices[0] = buf0Size - 1;
+ indices[1] = buf1Size - 1;
+
+ do {
+ i = biggest(indices);
+ indices[i]--;
+ sum += weight(i);
+ } while (sum < target);
+ return buffer[i][indices[i] + 1];
+ }
+
+ @Override
+ public synchronized long[] getQuantiles(double[] quantiles) {
+ return Histograms.extractQuantiles(this, quantiles);
+ }
+
+ @Override
+ public synchronized void clear() {
+ count = 0L;
+ leafCount = 0;
+ currentTop = 1;
+ rootWeight = 1;
+ leavesSorted = true;
+ }
+
+ /**
+ * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the
+ * underlying histograms as it was just one.
+ * Note: Should only be used for querying the underlying histograms.
+ */
+ private static class MergedHistogram implements Histogram {
+ private final ApproximateHistogram[] histograms;
+
+ private MergedHistogram(ApproximateHistogram[] histograms) {
+ this.histograms = histograms;
+ }
+
+ @Override
+ public void add(long x) {
+ /* Ignore, Shouldn't be used */
+ assert(false);
+ }
+
+ @Override
+ public void clear() {
+ /* Ignore, Shouldn't be used */
+ assert(false);
+ }
+
+ @Override
+ public long getQuantile(double quantile) {
+ Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0,
+ "quantile must be in the range 0.0 to 1.0 inclusive");
+
+ long count = initIndices();
+ if (count == 0) {
+ return 0L;
+ }
+
+ long sum = 0;
+ long target = (long) Math.ceil(count * (1.0 - quantile));
+ int iHist = -1;
+ int iBiggest = -1;
+ do {
+ long biggest = Long.MIN_VALUE;
+ for (int i = 0; i < histograms.length; i++) {
+ ApproximateHistogram hist = histograms[i];
+ int indexBiggest = hist.biggest(hist.indices);
+ if (indexBiggest >= 0) {
+ long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]];
+ if (iBiggest == -1 || biggest <= value) {
+ iBiggest = indexBiggest;
+ biggest = value;
+ iHist = i;
+ }
+ }
+ }
+ histograms[iHist].indices[iBiggest]--;
+ sum += histograms[iHist].weight(iBiggest);
+ } while (sum < target);
+
+ ApproximateHistogram hist = histograms[iHist];
+ int i = hist.indices[iBiggest];
+ return hist.buffer[iBiggest][i + 1];
+ }
+
+ @Override
+ public synchronized long[] getQuantiles(double[] quantiles) {
+ return Histograms.extractQuantiles(this, quantiles);
+ }
+
+ /**
+ * Initialize the indices array for each Histogram and return the global count.
+ */
+ private long initIndices() {
+ long count = 0L;
+ for (int i = 0; i < histograms.length; i++) {
+ ApproximateHistogram h = histograms[i];
+ int[] indices = h.indices;
+ count += h.count;
+ int buf0Size = Math.min(h.bufferSize, h.leafCount);
+ int buf1Size = Math.max(0, h.leafCount - buf0Size);
+
+ if (! h.leavesSorted) {
+ Arrays.sort(h.buffer[0], 0, buf0Size);
+ Arrays.sort(h.buffer[1], 0, buf1Size);
+ h.leavesSorted = true;
+ }
+ Arrays.fill(indices, h.bufferSize - 1);
+ indices[0] = buf0Size - 1;
+ indices[1] = buf1Size - 1;
+ }
+ return count;
+ }
+ }
+
+ /**
+ * Return a MergedHistogram
+ * @param histograms array of histograms to merged together
+ * @return a new Histogram
+ */
+ public static Histogram merge(ApproximateHistogram[] histograms) {
+ return new MergedHistogram(histograms);
+ }
+
+ /**
+ * We compute the "smallest possible b" satisfying two inequalities:
+ * 1) (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N
+ * 2) k * (2 ^ (b - 1)) >= N
+ *
+ * For an explanation of these inequalities, please read the Munro-Paterson or
+ * the Manku-Rajagopalan-Linday papers.
+ */
+ @VisibleForTesting static int computeDepth(double epsilon, long n) {
+ int b = 2;
+ while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) {
+ b += 1;
+ }
+ return b;
+ }
+
+ @VisibleForTesting static int computeBufferSize(int depth, long n) {
+ return (int) (n / (1L << (depth - 1)));
+ }
+
+ /**
+ * Return an estimation of the memory used by an instance.
+ * The size is due to:
+ * - a fix cost (76 bytes) for the class + fields
+ * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE)
+ * - indices: 16 + sizeof(Integer) * (depth + 1)
+ * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE)
+ *
+ * Note: This method is tested with unit test, it will break if you had new fields.
+ * @param bufferSize the size of a buffer
+ * @param depth the depth of the tree of buffer (depth + 1 buffers)
+ */
+ @VisibleForTesting
+ static long memoryUsage(int bufferSize, int depth) {
+ return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3));
+ }
+
+ /**
+ * Return the level of the biggest element (using the indices array 'ids'
+ * to track which elements have been already returned). Every buffer has
+ * already been sorted at this point.
+ * @return the level of the biggest element or -1 if no element has been found
+ */
+ @VisibleForTesting
+ int biggest(final int[] ids) {
+ long biggest = Long.MIN_VALUE;
+ final int id0 = ids[0], id1 = ids[1];
+ int iBiggest = -1;
+
+ if (0 < leafCount && 0 <= id0) {
+ biggest = buffer[0][id0];
+ iBiggest = 0;
+ }
+ if (bufferSize < leafCount && 0 <= id1) {
+ long x = buffer[1][id1];
+ if (x > biggest) {
+ biggest = x;
+ iBiggest = 1;
+ }
+ }
+ for (int i = 2; i < currentTop + 1; i++) {
+ if (!isBufferEmpty(i) && 0 <= ids[i]) {
+ long x = buffer[i][ids[i]];
+ if (x > biggest) {
+ biggest = x;
+ iBiggest = i;
+ }
+ }
+ }
+ return iBiggest;
+ }
+
+
+ /**
+ * Based on the number of elements inserted we can easily know if a buffer
+ * is empty or not
+ */
+ @VisibleForTesting
+ boolean isBufferEmpty(int level) {
+ if (level == currentTop) {
+ return false; // root buffer (if present) is always full
+ } else {
+ long levelWeight = 1 << (level - 1);
+ return (((count - leafCount) / bufferSize) & levelWeight) == 0;
+ }
+ }
+
+ /**
+ * Return the weight of the level ie. 2^(i-1) except for the two tree
+ * leaves (weight=1) and for the root
+ */
+ private int weight(int level) {
+ if (level == 0) {
+ return 1;
+ } else if (level == maxDepth) {
+ return rootWeight;
+ } else {
+ return 1 << (level - 1);
+ }
+ }
+
+ private void allocate(int i) {
+ if (buffer[i] == null) {
+ buffer[i] = new long[bufferSize];
+ }
+ }
+
+ /**
+ * Recursively collapse the buffers of the tree.
+ * Upper buffers will be allocated on first access in this method.
+ */
+ private void recCollapse(long[] buf, int level) {
+ // if we reach the root, we can't add more buffer
+ if (level == maxDepth) {
+ // weight() return the weight of the root, in that case we need the
+ // weight of merge result
+ int mergeWeight = 1 << (level - 1);
+ int idx = level % 2;
+ long[] merged = bufferPool[idx];
+ long[] tmp = buffer[level];
+ collapse(buf, mergeWeight, buffer[level], rootWeight, merged);
+ buffer[level] = merged;
+ bufferPool[idx] = tmp;
+ rootWeight += mergeWeight;
+ } else {
+ allocate(level + 1); // lazy allocation (if needed)
+ if (level == currentTop) {
+ // if we reach the top, add a new buffer
+ collapse1(buf, buffer[level], buffer[level + 1]);
+ currentTop += 1;
+ rootWeight *= 2;
+ } else if (isBufferEmpty(level + 1)) {
+ // if the upper buffer is empty, use it
+ collapse1(buf, buffer[level], buffer[level + 1]);
+ } else {
+ // it the upper buffer isn't empty, collapse with it
+ long[] merged = bufferPool[level % 2];
+ collapse1(buf, buffer[level], merged);
+ recCollapse(merged, level + 1);
+ }
+ }
+ }
+
+ /**
+ * collapse two sorted Arrays of different weight
+ * ex: [2,5,7] weight 2 and [3,8,9] weight 3
+ * weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9]
+ * sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9]
+ * select every nth elems = [3,7,9] (n = sum weight / 2)
+ */
+ @VisibleForTesting
+ static void collapse(
+ long[] left,
+ int leftWeight,
+ long[] right,
+ int rightWeight,
+ long[] output) {
+
+ int totalWeight = leftWeight + rightWeight;
+ int halfTotalWeight = (totalWeight / 2) - 1;
+ int i = 0, j = 0, k = 0, cnt = 0;
+
+ int weight;
+ long smallest;
+
+ while (i < left.length || j < right.length) {
+ if (i < left.length && (j == right.length || left[i] < right[j])) {
+ smallest = left[i];
+ weight = leftWeight;
+ i++;
+ } else {
+ smallest = right[j];
+ weight = rightWeight;
+ j++;
+ }
+
+ int cur = (cnt + halfTotalWeight) / totalWeight;
+ cnt += weight;
+ int next = (cnt + halfTotalWeight) / totalWeight;
+
+ for(; cur < next; cur++) {
+ output[k] = smallest;
+ k++;
+ }
+ }
+ }
+
+/**
+ * Optimized version of collapse for collapsing two array of the same weight
+ * (which is what we want most of the time)
+ */
+ private static void collapse1(
+ long[] left,
+ long[] right,
+ long[] output) {
+
+ int i = 0, j = 0, k = 0, cnt = 0;
+ long smallest;
+
+ while (i < left.length || j < right.length) {
+ if (i < left.length && (j == right.length || left[i] < right[j])) {
+ smallest = left[i];
+ i++;
+ } else {
+ smallest = right[j];
+ j++;
+ }
+ if (cnt % 2 == 1) {
+ output[k] = smallest;
+ k++;
+ }
+ cnt++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
new file mode 100644
index 0000000..024e67b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * A map from a key type to integers. This simplifies the process of storing counters for multiple
+ * values of the same type.
+ */
+public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable {
+ private final Map<K, Integer> map = Maps.newHashMap();
+
+ private static Logger log = Logger.getLogger(CounterMap.class.getName());
+
+ /**
+ * Increments the counter value associated with {@code key}, and returns the new value.
+ *
+ * @param key The key to increment
+ * @return The incremented value.
+ */
+ public int incrementAndGet(K key) {
+ return incrementAndGet(key, 1);
+ }
+
+ /**
+ * Increments the value associated with {@code key} by {@code value}, returning the new value.
+ *
+ * @param key The key to increment
+ * @return The incremented value.
+ */
+ public int incrementAndGet(K key, int count) {
+ Integer value = map.get(key);
+ if (value == null) {
+ value = 0;
+ }
+ int newValue = count + value;
+ map.put(key, newValue);
+ return newValue;
+ }
+
+ /**
+ * Gets the value associated with a key.
+ *
+ * @param key The key to look up.
+ * @return The counter value stored for {@code key}, or 0 if no mapping exists.
+ */
+ public int get(K key) {
+ if (!map.containsKey(key)) {
+ return 0;
+ }
+
+ return map.get(key);
+ }
+
+ /**
+ * Assigns a value to a key.
+ *
+ * @param key The key to assign a value to.
+ * @param newValue The value to assign.
+ */
+ public void set(K key, int newValue) {
+ Preconditions.checkNotNull(key);
+ map.put(key, newValue);
+ }
+
+ /**
+ * Resets the value for {@code key}. This will remove the key from the counter.
+ *
+ * @param key The key to reset.
+ */
+ public void reset(K key) {
+ map.remove(key);
+ }
+
+ /**
+ * Gets the number of entries stored in the map.
+ *
+ * @return The size of the map.
+ */
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * Gets an iterator for the mapped values.
+ *
+ * @return Iterator for mapped values.
+ */
+ public Iterator<Map.Entry<K, Integer>> iterator() {
+ return map.entrySet().iterator();
+ }
+
+ public Collection<Integer> values() {
+ return map.values();
+ }
+
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ public String toString() {
+ StringBuilder strVal = new StringBuilder();
+ for (Map.Entry<K, Integer> entry : this) {
+ strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n');
+ }
+ return strVal.toString();
+ }
+
+ public Map<K, Integer> toMap() {
+ return map;
+ }
+
+ @Override
+ public CounterMap<K> clone() {
+ CounterMap<K> newInstance = new CounterMap<K>();
+ newInstance.map.putAll(map);
+ return newInstance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
new file mode 100644
index 0000000..1e90e85
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Map;
+
+/**
+ * Same as CounterMap<K>, but also keeps track of the item with the highest count.
+ */
+public class CounterMapWithTopKey<K> extends CounterMap<K> {
+
+ private K mostCommonKey = null;
+
+ /**
+ * Updates the most common key, if needed.
+ *
+ * @param key The key to check.
+ * @param count The count for the key.
+ * @return The count.
+ */
+ private int updateMostCommon(K key, int count) {
+ if (count > get(mostCommonKey)) {
+ mostCommonKey = key;
+ }
+ return count;
+ }
+
+ /**
+ * Increments the counter value associated with {@code key}, and returns the new value.
+ *
+ * @param key The key to increment
+ * @return The incremented value.
+ */
+ @Override
+ public int incrementAndGet(K key) {
+ return updateMostCommon(key, super.incrementAndGet(key));
+ }
+
+ /**
+ * Assigns a value to a key.
+ *
+ * @param key The key to assign a value to.
+ * @param newValue The value to assign.
+ */
+ @Override
+ public void set(K key, int newValue) {
+ super.set(key, updateMostCommon(key, newValue));
+ }
+
+ /**
+ * Resets the value for {@code key}. This will simply set the stored value to 0.
+ * The most common key is updated by scanning the entire map.
+ *
+ * @param key The key to reset.
+ */
+ @Override
+ public void reset(K key) {
+ super.reset(key);
+ for (Map.Entry<K, Integer> entry : this) {
+ updateMostCommon(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ *
+ * @return The key with the highest count in the map. If multiple keys have this count, return
+ * an arbitrary one.
+ */
+ public K getMostCommonKey() {
+ return mostCommonKey;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder(super.toString()).append(String.format("Most common key: %s\n",
+ mostCommonKey.toString())).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
new file mode 100644
index 0000000..859ca7e
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A stat that exports the amount of time since it was last reset.
+ *
+ * @author William Farner
+ */
+public class Elapsed {
+
+ private final Ticker ticker;
+ private final AtomicLong lastEventNs = new AtomicLong();
+
+ /**
+ * Calls {@link #Elapsed(String, Time)} using a default granularity of nanoseconds.
+ *
+ * @param name Name of the stat to export.
+ */
+ public Elapsed(String name) {
+ this(name, Time.NANOSECONDS);
+ }
+
+ /**
+ * Equivalent to calling {@link #Elapsed(String, Time, Ticker)} passing {@code name},
+ * {@code granularity} and {@link com.google.common.base.Ticker#systemTicker()}.
+ * <br/>
+ * @param name Name of the stat to export.
+ * @param granularity Time unit granularity to export.
+ */
+ public Elapsed(String name, Time granularity) {
+ this(name, granularity, Ticker.systemTicker());
+ }
+
+ /**
+ * Creates and exports a new stat that maintains the difference between the tick time
+ * and the time since it was last reset. Upon export, the counter will act as though it were just
+ * reset.
+ * <br/>
+ * @param name Name of stat to export
+ * @param granularity Time unit granularity to export.
+ * @param ticker Ticker implementation
+ */
+ public Elapsed(String name, final Time granularity, final Ticker ticker) {
+ MorePreconditions.checkNotBlank(name);
+ Preconditions.checkNotNull(granularity);
+ this.ticker = Preconditions.checkNotNull(ticker);
+
+ reset();
+
+ Stats.export(new StatImpl<Long>(name) {
+ @Override public Long read() {
+ return Amount.of(ticker.read() - lastEventNs.get(), Time.NANOSECONDS).as(granularity);
+ }
+ });
+ }
+
+ public void reset() {
+ lastEventNs.set(ticker.read());
+ }
+}