You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/08/28 20:33:35 UTC
[17/21] aurora git commit: Remove unused classes from commons fork.
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
deleted file mode 100644
index fba1e4b..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.monitoring;
-
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.net.loadbalancing.RequestTracker;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.common.util.concurrent.ExecutorServiceShutdown;
-
-/**
- * Monitors activity on established connections between two hosts. This can be used for a server
- * to track inbound clients, or for a client to track requests sent to different servers.
- *
- * The monitor will retain information for hosts that may no longer be active, but will expunge
- * information for hosts that have been idle for more than five minutes.
- *
- * @author William Farner
- */
-public class TrafficMonitor<K> implements ConnectionMonitor<K>, RequestTracker<K> {
-
- @VisibleForTesting
- static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, Time.MINUTES);
-
- @GuardedBy("this")
- private final LoadingCache<K, TrafficInfo> trafficInfos;
-
- private final String serviceName;
- private final Amount<Long, Time> gcInterval;
-
- private AtomicLong lifetimeRequests = new AtomicLong();
- private final Clock clock;
- private final ScheduledExecutorService gcExecutor;
-
- /**
- * Creates a new traffic monitor using the default cleanup interval.
- *
- * @param serviceName Name of the service to monitor, used for creating variable names.
- */
- public TrafficMonitor(final String serviceName) {
- this(serviceName, DEFAULT_GC_INTERVAL);
- }
-
- /**
- * Creates a new traffic monitor with a custom cleanup interval.
- *
- * @param serviceName Service name for the monitor.
- * @param gcInterval Interval on which the remote host garbage collector should run.
- */
- public TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval) {
- this(serviceName, gcInterval, Clock.SYSTEM_CLOCK);
- }
-
- /**
- * Convenience method to create a typed traffic monitor.
- *
- * @param serviceName Service name for the monitor.
- * @param <T> Monitor type.
- * @return A new traffic monitor.
- */
- public static <T> TrafficMonitor<T> create(String serviceName) {
- return new TrafficMonitor<T>(serviceName);
- }
-
- @VisibleForTesting
- TrafficMonitor(final String serviceName, Clock clock) {
- this(serviceName, DEFAULT_GC_INTERVAL, clock);
- }
-
- private TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval, Clock clock) {
- this.serviceName = MorePreconditions.checkNotBlank(serviceName);
- this.clock = Preconditions.checkNotNull(clock);
- Preconditions.checkNotNull(gcInterval);
- Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must be > zero.");
- this.gcInterval = gcInterval;
-
- trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, TrafficInfo>() {
- @Override public TrafficInfo load(K key) {
- return new TrafficInfo(key);
- }
- });
-
- Runnable gc = new Runnable() {
- @Override public void run() { gc(); }
- };
-
- gcExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("TrafficMonitor-gc-%d").build());
- gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), gcInterval.as(Time.SECONDS),
- TimeUnit.SECONDS);
- }
-
- /**
- * Gets the name of the service that this monitor is monitoring.
- *
- * @return Monitor's service name.
- */
- public String getServiceName() {
- return serviceName;
- }
-
- /**
- * Gets the total number of requests that this monitor has observed, for all remote hosts.
- *
- * @return Total number of requests observed.
- */
- public long getLifetimeRequestCount() {
- return lifetimeRequests.get();
- }
-
- /**
- * Fetches all current traffic information.
- *
- * @return A map from the host key type to information about that host.
- */
- public synchronized Map<K, TrafficInfo> getTrafficInfo() {
- return ImmutableMap.copyOf(trafficInfos.asMap());
- }
-
- @Override
- public synchronized void connected(K key) {
- Preconditions.checkNotNull(key);
-
- trafficInfos.getUnchecked(key).incConnections();
- }
-
- @Override
- public synchronized void released(K key) {
- Preconditions.checkNotNull(key);
-
- TrafficInfo info = trafficInfos.getUnchecked(key);
-
- Preconditions.checkState(info.getConnectionCount() > 0, "Double release detected!");
- info.decConnections();
- }
-
- @Override
- public void requestResult(K key, RequestResult result, long requestTimeNanos) {
- Preconditions.checkNotNull(key);
-
- lifetimeRequests.incrementAndGet();
- trafficInfos.getUnchecked(key).addResult(result);
- }
-
- @VisibleForTesting
- synchronized void gc() {
- Iterables.removeIf(trafficInfos.asMap().entrySet(),
- new Predicate<Map.Entry<K, TrafficInfo>>() {
- @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) {
- if (clientInfo.getValue().connections.get() > 0) return false;
-
- long idlePeriod = clock.nowNanos() - clientInfo.getValue().getLastActiveTimestamp();
-
- return idlePeriod > gcInterval.as(Time.NANOSECONDS);
- }
- });
- }
-
- /**
- * Shuts down TrafficMonitor by stopping background gc task.
- */
- public void shutdown() {
- new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, Time.SECONDS)).execute();
- }
-
- /**
- * Information about traffic obsserved to/from a specific host.
- */
- public class TrafficInfo {
- private final K key;
- private AtomicInteger requestSuccesses = new AtomicInteger();
- private AtomicInteger requestFailures = new AtomicInteger();
- private AtomicInteger connections = new AtomicInteger();
- private AtomicLong lastActive = new AtomicLong();
-
- TrafficInfo(K key) {
- this.key = key;
- pulse();
- }
-
- void pulse() {
- lastActive.set(clock.nowNanos());
- }
-
- public K getKey() {
- return key;
- }
-
- void addResult(RequestResult result) {
- pulse();
- switch (result) {
- case SUCCESS:
- requestSuccesses.incrementAndGet();
- break;
- case FAILED:
- case TIMEOUT:
- requestFailures.incrementAndGet();
- break;
- }
- }
-
- public int getRequestSuccessCount() {
- return requestSuccesses.get();
- }
-
- public int getRequestFailureCount() {
- return requestFailures.get();
- }
-
- int incConnections() {
- pulse();
- return connections.incrementAndGet();
- }
-
- int decConnections() {
- pulse();
- return connections.decrementAndGet();
- }
-
- public int getConnectionCount() {
- return connections.get();
- }
-
- public long getLastActiveTimestamp() {
- return lastActive.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
deleted file mode 100644
index cdaaeab..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-/**
- * A factory for connections that also dictates policy for the size of the connection population.
- *
- * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null protocol is already tangling
- * implementation code
- *
- * @author John Sirois
- */
-public interface ConnectionFactory<S extends Connection<?, ?>> {
-
- /**
- * Checks whether this factory might create a connection if requested.
- *
- * @return {@code} true if this factory might create a connection at this point in time; ie
- * a call to {@link #create} might not have returned {@code null}. May return true to multiple
- * threads if concurrently creating connections.
- */
- boolean mightCreate();
-
- /**
- * Attempts to create a new connection within the given timeout and subject to this factory's
- * connection population size policy.
- *
- * @param timeout the maximum amount of time to wait
- * @return a new connection or null if there are too many connections already
- * @throws Exception if there was a problem creating the connection or establishing the connection
- * takes too long
- */
- S create(Amount<Long, Time> timeout) throws Exception;
-
- /**
- * Destroys a connection. It is an error to attempt to destroy a connection this factory did
- * not {@link #create}
- *
- * @param connection The connection to destroy.
- */
- void destroy(S connection);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
deleted file mode 100644
index 316bf2b..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.aurora.common.base.Supplier;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.stats.StatsProvider;
-
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and
- * connection choice to a supplied strategy.
- *
- * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in
- * use.
- *
- * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
- *
- * @author John Sirois
- */
-public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> {
-
- private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName());
-
- private final Set<S> leasedConnections =
- Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap());
- private final Set<S> availableConnections = Sets.newHashSet();
- private final Lock poolLock;
- private final Condition available;
-
- private final ConnectionFactory<S> connectionFactory;
- private final Executor executor;
-
- private volatile boolean closed;
- private final AtomicLong connectionsCreated;
- private final AtomicLong connectionsDestroyed;
- private final AtomicLong connectionsReturned;
-
- /**
- * Creates a connection pool with a connection picker that selects the first item in the set of
- * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}.
- *
- * @param connectionFactory Factory to create and destroy connections.
- */
- public ConnectionPool(ConnectionFactory<S> connectionFactory) {
- this(connectionFactory, Stats.STATS_PROVIDER);
- }
-
- /**
- * Creates a connection pool with a connection picker that selects the first item in the set of
- * available connections and uses the supplied StatsProvider to register stats with.
- *
- * @param connectionFactory Factory to create and destroy connections.
- * @param statsProvider Stats export provider.
- */
- public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) {
- this(Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("CP-" + connectionFactory + "[%d]")
- .setDaemon(true)
- .build()),
- new ReentrantLock(true), connectionFactory, statsProvider);
- }
-
- @VisibleForTesting
- ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory,
- StatsProvider statsProvider) {
- Preconditions.checkNotNull(executor);
- Preconditions.checkNotNull(poolLock);
- Preconditions.checkNotNull(connectionFactory);
- Preconditions.checkNotNull(statsProvider);
-
- this.executor = executor;
- this.poolLock = poolLock;
- available = poolLock.newCondition();
- this.connectionFactory = connectionFactory;
-
- String cfName = Stats.normalizeName(connectionFactory.toString());
- statsProvider.makeGauge("cp_leased_connections_" + cfName,
- new Supplier<Integer>() {
- @Override public Integer get() {
- return leasedConnections.size();
- }
- });
- statsProvider.makeGauge("cp_available_connections_" + cfName,
- new Supplier<Integer>() {
- @Override public Integer get() {
- return availableConnections.size();
- }
- });
- this.connectionsCreated =
- statsProvider.makeCounter("cp_created_connections_" + cfName);
- this.connectionsDestroyed =
- statsProvider.makeCounter("cp_destroyed_connections_" + cfName);
- this.connectionsReturned =
- statsProvider.makeCounter("cp_returned_connections_" + cfName);
- }
-
- @Override
- public String toString() {
- return "CP-" + connectionFactory;
- }
-
- @Override
- public S get() throws ResourceExhaustedException, TimeoutException {
- checkNotClosed();
- poolLock.lock();
- try {
- return leaseConnection(NO_TIMEOUT);
- } finally {
- poolLock.unlock();
- }
- }
-
- @Override
- public S get(Amount<Long, Time> timeout)
- throws ResourceExhaustedException, TimeoutException {
-
- checkNotClosed();
- Preconditions.checkNotNull(timeout);
- if (timeout.getValue() == 0) {
- return get();
- }
-
- try {
- long start = System.nanoTime();
- long timeBudgetNs = timeout.as(Time.NANOSECONDS);
- if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) {
- try {
- timeBudgetNs -= (System.nanoTime() - start);
- return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS));
- } finally {
- poolLock.unlock();
- }
- } else {
- throw new TimeoutException("Timed out waiting for pool lock");
- }
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted waiting for pool lock");
- }
- }
-
- private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException,
- TimeoutException {
- S connection = getConnection(timeout);
- if (connection == null) {
- throw new ResourceExhaustedException("Connection pool resources exhausted");
- }
- return leaseConnection(connection);
- }
-
- @Override
- public void release(S connection) {
- release(connection, false);
- }
-
- /**
- * Equivalent to releasing a Connection with isValid() == false.
- * @see ObjectPool#remove(Object)
- */
- @Override
- public void remove(S connection) {
- release(connection, true);
- }
-
- // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create
- // connection - reason about this and potentially submit release to our executor
- private void release(S connection, boolean remove) {
- poolLock.lock();
- try {
- if (!leasedConnections.remove(connection)) {
- throw new IllegalArgumentException("Connection not controlled by this connection pool: "
- + connection);
- }
-
- if (!closed && !remove && connection.isValid()) {
- addConnection(connection);
- connectionsReturned.incrementAndGet();
- } else {
- connectionFactory.destroy(connection);
- connectionsDestroyed.incrementAndGet();
- }
- } finally {
- poolLock.unlock();
- }
- }
-
- @Override
- public void close() {
- poolLock.lock();
- try {
- for (S availableConnection : availableConnections) {
- connectionFactory.destroy(availableConnection);
- }
- } finally {
- closed = true;
- poolLock.unlock();
- }
- }
-
- private void checkNotClosed() {
- Preconditions.checkState(!closed);
- }
-
- private S leaseConnection(S connection) {
- leasedConnections.add(connection);
- return connection;
- }
-
- // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be
- // fixed but there may be no need - do gedankanalysis
- private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException,
- TimeoutException {
- if (availableConnections.isEmpty()) {
- if (leasedConnections.isEmpty()) {
- // Completely empty pool
- try {
- return createConnection(timeout);
- } catch (Exception e) {
- throw new ResourceExhaustedException("failed to create a new connection", e);
- }
- } else {
- // If the pool is allowed to grow - let the connection factory race a release
- if (connectionFactory.mightCreate()) {
- executor.execute(new Runnable() {
- @Override public void run() {
- try {
- // The connection timeout is not needed here to honor the callers get requested
- // timeout, but we don't want to have an infinite timeout which could exhaust a
- // thread pool over many backgrounded create calls
- S connection = createConnection(timeout);
- if (connection != null) {
- addConnection(connection);
- } else {
- LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " +
- "due to maximum pool size or timeout");
- }
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e);
- }
- }
- });
- }
-
- try {
- // We wait for a returned/new connection here in loops to guard against the
- // "spurious wakeups" that are documented can occur with Condition.await()
- if (timeout.getValue() == 0) {
- while(availableConnections.isEmpty()) {
- available.await();
- }
- } else {
- long timeRemainingNs = timeout.as(Time.NANOSECONDS);
- while(availableConnections.isEmpty()) {
- long start = System.nanoTime();
- if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) {
- throw new TimeoutException(
- "timeout waiting for a connection to be released to the pool");
- } else {
- timeRemainingNs -= (System.nanoTime() - start);
- }
- }
- if (availableConnections.isEmpty()) {
- throw new TimeoutException(
- "timeout waiting for a connection to be released to the pool");
- }
- }
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for a connection.");
- }
- }
- }
-
- return getAvailableConnection();
- }
-
- private S getAvailableConnection() {
- S connection = (availableConnections.size() == 1)
- ? Iterables.getOnlyElement(availableConnections)
- : availableConnections.iterator().next();
- if (!availableConnections.remove(connection)) {
- throw new IllegalArgumentException("Connection picked not in pool: " + connection);
- }
- return connection;
- }
-
- private S createConnection(Amount<Long, Time> timeout) throws Exception {
- S connection = connectionFactory.create(timeout);
- if (connection != null) {
- connectionsCreated.incrementAndGet();
- }
- return connection;
- }
-
- private void addConnection(S connection) {
- poolLock.lock();
- try {
- availableConnections.add(connection);
- available.signal();
- } finally {
- poolLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
deleted file mode 100644
index 4f75893..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-
-/**
- * Utility methods for dealing with dynamic sets of hosts.
- */
-public final class DynamicHostSetUtil {
-
- /**
- * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of
- * the underlying actual endpoints.
- *
- * @param hostSet The hostSet to snapshot.
- * @throws DynamicHostSet.MonitorException if there was a problem obtaining the snapshot.
- */
- public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws DynamicHostSet.MonitorException {
- final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder();
- Command unwatch = hostSet.watch(new DynamicHostSet.HostChangeMonitor<T>() {
- @Override public void onChange(ImmutableSet<T> hostSet) {
- snapshot.addAll(hostSet);
- }
- });
- unwatch.execute();
- return snapshot.build();
- }
-
- private DynamicHostSetUtil() {
- // utility
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
deleted file mode 100644
index 2fd6046..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ServerSet;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a
- * {@link ServerSet}.
- *
- * @param <H> The type that contains metadata information about hosts, such as liveness and address.
- * @param <T> The raw connection type that is being pooled.
- * @param <E> The type that identifies the endpoint of the pool, such as an address.
- * @author John Sirois
- */
-public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> {
-
- private final MetaPool<T, E> pool;
-
- /**
- * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools
- * for the given {@code serverSet}.
- *
- * @param hostSet the dynamic set of available servers to pool connections for
- * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint
- * @param loadBalancer Load balancer to manage request flow.
- * @param onBackendsChosen A callback to notify of chosen backends.
- * @param restoreInterval the interval after connection errors start occurring for a target to
- * begin checking to see if it has come back to a healthy state
- * @param endpointExtractor Function that transforms a service instance into an endpoint instance.
- * @param livenessChecker Filter that will determine whether a host indicates itself as available.
- * @throws DynamicHostSet.MonitorException if there is a problem monitoring the host set
- */
- public DynamicPool(DynamicHostSet<H> hostSet,
- Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory,
- LoadBalancer<E> loadBalancer,
- Closure<Collection<E>> onBackendsChosen,
- Amount<Long, Time> restoreInterval,
- Function<H, E> endpointExtractor,
- Predicate<H> livenessChecker)
- throws DynamicHostSet.MonitorException {
- Preconditions.checkNotNull(hostSet);
- Preconditions.checkNotNull(endpointPoolFactory);
-
- pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval);
-
- // TODO(John Sirois): consider an explicit start/stop
- hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor,
- livenessChecker) {
- @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
- Map<E, ObjectPool<Connection<T, E>>> livePools) {
- poolRebuilt(deadPools, livePools);
- }
- });
- }
-
- @VisibleForTesting
- void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
- Map<E, ObjectPool<Connection<T, E>>> livePools) {
-
- pool.setBackends(livePools);
-
- for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) {
- deadTargetPool.close();
- }
- }
-
- @Override
- public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
- return pool.get();
- }
-
- @Override
- public Connection<T, E> get(Amount<Long, Time> timeout)
- throws ResourceExhaustedException, TimeoutException {
- return pool.get(timeout);
- }
-
- @Override
- public void release(Connection<T, E> connection) {
- pool.release(connection);
- }
-
- @Override
- public void remove(Connection<T, E> connection) {
- pool.remove(connection);
- }
-
- @Override
- public void close() {
- pool.close();
- }
-
- private abstract class PoolMonitor<H, S extends Connection<?, ?>>
- implements DynamicHostSet.HostChangeMonitor<H> {
-
- private final Function<E, ObjectPool<S>> endpointPoolFactory;
- private final Function<H, E> endpointExtractor;
- private final Predicate<H> livenessTest;
-
- public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory,
- Function<H, E> endpointExtractor,
- Predicate<H> livenessTest) {
- this.endpointPoolFactory = endpointPoolFactory;
- this.endpointExtractor = endpointExtractor;
- this.livenessTest = livenessTest;
- }
-
- private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap();
-
- @Override
- public synchronized void onChange(ImmutableSet<H> serverSet) {
- // TODO(John Sirois): change onChange to pass the delta data since its already computed by
- // ServerSet
-
- Map<E, H> newEndpoints =
- Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor);
-
- Set<E> deadEndpoints = ImmutableSet.copyOf(
- Sets.difference(endpointPools.keySet(), newEndpoints.keySet()));
- Set<ObjectPool<S>> deadPools = Sets.newHashSet();
- for (E endpoint : deadEndpoints) {
- ObjectPool<S> deadPool = endpointPools.remove(endpoint);
- deadPools.add(deadPool);
- }
-
- Set<E> addedEndpoints = ImmutableSet.copyOf(
- Sets.difference(newEndpoints.keySet(), endpointPools.keySet()));
- for (E endpoint : addedEndpoints) {
- ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint);
- endpointPools.put(endpoint, endpointPool);
- }
-
- onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools));
- }
-
- protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools,
- Map<E, ObjectPool<S>> livePools);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
deleted file mode 100644
index df1bd96..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-/**
- * A connection pool that picks connections from a set of backend pools. Backend pools are selected
- * from randomly initially but then as they are used they are ranked according to how many
- * connections they have available and whether or not the last used connection had an error or not.
- * In this way, backends that are responsive should get selected in preference to those that are
- * not.
- *
- * <p>Non-responsive backends are monitored after a configurable period in a background thread and
- * if a connection can be obtained they start to float back up in the rankings. In this way,
- * backends that are initially non-responsive but later become responsive should end up getting
- * selected.
- *
- * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
- *
- * @author John Sirois
- */
-public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> {
-
- private final Command stopBackendRestorer;
-
- private Map<E, ObjectPool<Connection<T, E>>> backends = null;
-
- // Locks to guard mutation of the backends set.
- private final Lock backendsReadLock;
- private final Lock backendsWriteLock;
-
- private final Closure<Collection<E>> onBackendsChosen;
-
- private final LoadBalancer<E> loadBalancer;
-
- /**
- * Creates a connection pool with no backends. Backends may be added post-creation by calling
- * {@link #setBackends(java.util.Map)}
- *
- * @param loadBalancer the load balancer to distribute requests among backends.
- * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
- * set of backends to restrict its call distribution to
- * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
- * see if it has come back to a healthy state
- */
- public MetaPool(LoadBalancer<E> loadBalancer,
- Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
- this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer,
- onBackendsChosen, restoreInterval);
- }
-
- /**
- * Creates a connection pool that balances connections across multiple backend pools.
- *
- * @param backends the connection pools for the backends
- * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
- * set of backends to restrict its call distribution to
- * @param loadBalancer the load balancer to distribute requests among backends.
- * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
- * see if it has come back to a healthy state
- */
- public MetaPool(
- ImmutableMap<E, ObjectPool<Connection<T, E>>> backends,
- LoadBalancer<E> loadBalancer,
- Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
-
- this.loadBalancer = Preconditions.checkNotNull(loadBalancer);
- this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen);
-
- ReadWriteLock backendsLock = new ReentrantReadWriteLock(true);
- backendsReadLock = backendsLock.readLock();
- backendsWriteLock = backendsLock.writeLock();
-
- setBackends(backends);
-
- Preconditions.checkNotNull(restoreInterval);
- Preconditions.checkArgument(restoreInterval.getValue() > 0);
- stopBackendRestorer = startDeadBackendRestorer(restoreInterval);
- }
-
- /**
- * Assigns the backend pools that this pool should draw from.
- *
- * @param pools New pools to use.
- */
- public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) {
- backendsWriteLock.lock();
- try {
- backends = Preconditions.checkNotNull(pools);
- loadBalancer.offerBackends(pools.keySet(), onBackendsChosen);
- } finally {
- backendsWriteLock.unlock();
- }
- }
-
- private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) {
-
- final AtomicBoolean shouldRestore = new AtomicBoolean(true);
- Runnable restoreDeadBackends = new Runnable() {
- @Override public void run() {
- if (shouldRestore.get()) {
- restoreDeadBackends(restoreInterval);
- }
- }
- };
- final ScheduledExecutorService scheduledExecutorService =
- Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("MTCP-DeadBackendRestorer[%s]")
- .build());
- long restoreDelay = restoreInterval.getValue();
- scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay,
- restoreDelay, restoreInterval.getUnit().getTimeUnit());
-
- return new Command() {
- @Override public void execute() {
- shouldRestore.set(false);
- scheduledExecutorService.shutdownNow();
- LOG.info("Backend restorer shut down");
- }
- };
- }
-
- private static final Logger LOG = Logger.getLogger(MetaPool.class.getName());
-
- private void restoreDeadBackends(Amount<Long, Time> restoreInterval) {
- for (E backend : snapshotBackends()) {
- ObjectPool<Connection<T, E>> pool;
- backendsReadLock.lock();
- try {
- pool = backends.get(backend);
- } finally {
- backendsReadLock.unlock();
- }
-
- // We can lose a race if the backends change - and that's fine, we'll restore the new set of
- // backends in the next scheduled restoration run.
- if (pool != null) {
- try {
- release(get(backend, pool, restoreInterval));
- } catch (TimeoutException e) {
- LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
- } catch (ResourceExhaustedException e) {
- LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
- }
- }
- }
- }
-
- private Iterable<E> snapshotBackends() {
- backendsReadLock.lock();
- try {
- return ImmutableList.copyOf(backends.keySet());
- } finally {
- backendsReadLock.unlock();
- }
- }
-
- @Override
- public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
- return get(ObjectPool.NO_TIMEOUT);
- }
-
- @Override
- public Connection<T, E> get(Amount<Long, Time> timeout)
- throws ResourceExhaustedException, TimeoutException {
-
- E backend;
- ObjectPool<Connection<T, E>> pool;
-
- backendsReadLock.lock();
- try {
- backend = loadBalancer.nextBackend();
- Preconditions.checkNotNull(backend, "Load balancer gave a null backend.");
-
- pool = backends.get(backend);
- Preconditions.checkNotNull(backend,
- "Given backend %s not found in tracked backends: %s", backend, backends);
- } finally {
- backendsReadLock.unlock();
- }
-
- return get(backend, pool, timeout);
- }
-
- private static class ManagedConnection<T, E> implements Connection<T, E> {
- private final Connection<T, E> connection;
- private final ObjectPool<Connection<T, E>> pool;
-
- private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) {
- this.connection = connection;
- this.pool = pool;
- }
-
- @Override
- public void close() {
- connection.close();
- }
-
- @Override
- public T get() {
- return connection.get();
- }
-
- @Override
- public boolean isValid() {
- return connection.isValid();
- }
-
- @Override
- public E getEndpoint() {
- return connection.getEndpoint();
- }
-
- @Override public String toString() {
- return "ManagedConnection[" + connection.toString() + "]";
- }
-
- void release(boolean remove) {
- if (remove) {
- pool.remove(connection);
- } else {
- pool.release(connection);
- }
- }
- }
-
- private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool,
- Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
-
- long startNanos = System.nanoTime();
- try {
- Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout);
-
- // BEWARE: We have leased a connection from the underlying pool here and must return it to the
- // caller so they can later release it. If we fail to do so, the connection will leak.
- // Catching intermediate exceptions ourselves and pro-actively returning the connection to the
- // pool before re-throwing is not a viable option since the return would have to succeed,
- // forcing us to ignore the timeout passed in.
-
- // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire
- // a (backend, pool) pair atomically that has since been removed, we can safely let the lb
- // know about backend events and it will just ignore us.
-
- try {
- loadBalancer.connected(backend, System.nanoTime() - startNanos);
- } catch (RuntimeException e) {
- LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after "
- + "leasing a connection - continuing", e);
- }
- return new ManagedConnection<T, E>(connection, pool);
- } catch (TimeoutException e) {
- loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT);
- throw e;
- } catch (ResourceExhaustedException e) {
- loadBalancer.connectFailed(backend, ConnectionResult.FAILED);
- throw e;
- }
- }
-
- @Override
- public void release(Connection<T, E> connection) {
- release(connection, false);
- }
-
- /**
- * Equivalent to releasing a Connection with isValid() == false.
- * @see ObjectPool#remove(Object)
- */
- @Override
- public void remove(Connection<T, E> connection) {
- release(connection, true);
- }
-
- private void release(Connection<T, E> connection, boolean remove) {
- backendsWriteLock.lock();
- try {
-
- if (!(connection instanceof ManagedConnection)) {
- throw new IllegalArgumentException("Connection not controlled by this connection pool: "
- + connection);
- }
- ((ManagedConnection) connection).release(remove);
-
- loadBalancer.released(connection.getEndpoint());
- } finally {
- backendsWriteLock.unlock();
- }
- }
-
- @Override
- public void close() {
- stopBackendRestorer.execute();
-
- backendsWriteLock.lock();
- try {
- for (ObjectPool<Connection<T, E>> backend : backends.values()) {
- backend.close();
- }
- } finally {
- backendsWriteLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
deleted file mode 100644
index a665903..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * A generic object pool that provides object of a given type for exclusive use by the caller.
- * Object pools generally pool expensive resources and so offer a {@link #close} method that should
- * be used to free these resources when the pool is no longer needed.
- *
- * @author John Sirois
- */
-public interface ObjectPool<T> {
-
- /**
- * Gets a resource potentially blocking for as long as it takes to either create a new one or wait
- * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release}
- * the connection when they are done with it.
- *
- * @return a resource for exclusive use by the caller
- * @throws ResourceExhaustedException if no resource could be obtained because this pool was
- * exhausted
- * @throws TimeoutException if we timed out while trying to fetch a resource
- */
- T get() throws ResourceExhaustedException, TimeoutException;
-
- /**
- * A convenience constant representing a no timeout.
- */
- Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS);
-
- /**
- * Gets a resource; timing out if there are none available and it takes longer than specified to
- * create a new one or wait for one to be {@link #release(Object) released}. Callers must
- * {@link #release (Object) release} the connection when they are done with it.
- *
- * @param timeout the maximum amount of time to wait
- * @return a resource for exclusive use by the caller
- * @throws TimeoutException if the specified timeout was reached before a resource became
- * available
- * @throws ResourceExhaustedException if no resource could be obtained because this pool was
- * exhausted
- */
- T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException;
-
- /**
- * Releases a resource obtained from this pool back into the pool of available resources. It is an
- * error to release a resource not obtained from this pool.
- *
- * @param resource Resource to release.
- */
- void release(T resource);
-
- /**
- * Removes a resource obtained from this pool from its available resources. It is an error to
- * remove a resource not obtained from this pool.
- *
- * @param resource Resource to remove.
- */
- void remove(T resource);
-
- /**
- * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects
- * when they are released.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
deleted file mode 100644
index fd48ddb..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-/**
- * @author John Sirois
- */
-public class ResourceExhaustedException extends Exception {
- public ResourceExhaustedException(String message) {
- super(message);
- }
-
- public ResourceExhaustedException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
deleted file mode 100644
index 95c8868..0000000
--- a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.objectsize;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Sets;
-
-/**
- * Contains utility methods for calculating the memory usage of objects. It
- * only works on the HotSpot JVM, and infers the actual memory layout (32 bit
- * vs. 64 bit word size, compressed object pointers vs. uncompressed) from
- * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM.
- * It can only make an educated guess at whether compressed OOPs are used,
- * though; specifically, it knows what the JVM's default choice of OOP
- * compression would be based on HotSpot version and maximum heap sizes, but if
- * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line
- * switch, it can not detect
- * this fact and will report incorrect sizes, as it will presume the default JVM
- * behavior.
- *
- * @author Attila Szegedi
- */
-public class ObjectSizeCalculator {
-
- /**
- * Describes constant memory overheads for various constructs in a JVM implementation.
- */
- public interface MemoryLayoutSpecification {
-
- /**
- * Returns the fixed overhead of an array of any type or length in this JVM.
- *
- * @return the fixed overhead of an array.
- */
- int getArrayHeaderSize();
-
- /**
- * Returns the fixed overhead of for any {@link Object} subclass in this JVM.
- *
- * @return the fixed overhead of any object.
- */
- int getObjectHeaderSize();
-
- /**
- * Returns the quantum field size for a field owned by an object in this JVM.
- *
- * @return the quantum field size for an object.
- */
- int getObjectPadding();
-
- /**
- * Returns the fixed size of an object reference in this JVM.
- *
- * @return the size of all object references.
- */
- int getReferenceSize();
-
- /**
- * Returns the quantum field size for a field owned by one of an object's ancestor superclasses
- * in this JVM.
- *
- * @return the quantum field size for a superclass field.
- */
- int getSuperclassFieldPadding();
- }
-
- private static class CurrentLayout {
- private static final MemoryLayoutSpecification SPEC =
- getEffectiveMemoryLayoutSpecification();
- }
-
- /**
- * Given an object, returns the total allocated size, in bytes, of the object
- * and all other objects reachable from it. Attempts to to detect the current JVM memory layout,
- * but may fail with {@link UnsupportedOperationException};
- *
- * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
- * anything special, it measures the size of all objects
- * reachable through it (which will include its class loader, and by
- * extension, all other Class objects loaded by
- * the same loader, and all the parent class loaders). It doesn't provide the
- * size of the static fields in the JVM class that the Class object
- * represents.
- * @return the total allocated size of the object and all other objects it
- * retains.
- * @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
- */
- public static long getObjectSize(Object obj) throws UnsupportedOperationException {
- return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
- }
-
- // Fixed object header size for arrays.
- private final int arrayHeaderSize;
- // Fixed object header size for non-array objects.
- private final int objectHeaderSize;
- // Padding for the object size - if the object size is not an exact multiple
- // of this, it is padded to the next multiple.
- private final int objectPadding;
- // Size of reference (pointer) fields.
- private final int referenceSize;
- // Padding for the fields of superclass before fields of subclasses are
- // added.
- private final int superclassFieldPadding;
-
- private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
- CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
- public ClassSizeInfo load(Class<?> clazz) {
- return new ClassSizeInfo(clazz);
- }
- });
-
-
- private final Set<Object> alreadyVisited = Sets.newIdentityHashSet();
- private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024);
- private long size;
-
- /**
- * Creates an object size calculator that can calculate object sizes for a given
- * {@code memoryLayoutSpecification}.
- *
- * @param memoryLayoutSpecification a description of the JVM memory layout.
- */
- public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
- Preconditions.checkNotNull(memoryLayoutSpecification);
- arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
- objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
- objectPadding = memoryLayoutSpecification.getObjectPadding();
- referenceSize = memoryLayoutSpecification.getReferenceSize();
- superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
- }
-
- /**
- * Given an object, returns the total allocated size, in bytes, of the object
- * and all other objects reachable from it.
- *
- * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
- * anything special, it measures the size of all objects
- * reachable through it (which will include its class loader, and by
- * extension, all other Class objects loaded by
- * the same loader, and all the parent class loaders). It doesn't provide the
- * size of the static fields in the JVM class that the Class object
- * represents.
- * @return the total allocated size of the object and all other objects it
- * retains.
- */
- public synchronized long calculateObjectSize(Object obj) {
- // Breadth-first traversal instead of naive depth-first with recursive
- // implementation, so we don't blow the stack traversing long linked lists.
- try {
- for (;;) {
- visit(obj);
- if (pending.isEmpty()) {
- return size;
- }
- obj = pending.removeFirst();
- }
- } finally {
- alreadyVisited.clear();
- pending.clear();
- size = 0;
- }
- }
-
- private void visit(Object obj) {
- if (alreadyVisited.contains(obj)) {
- return;
- }
- final Class<?> clazz = obj.getClass();
- if (clazz == ArrayElementsVisitor.class) {
- ((ArrayElementsVisitor) obj).visit(this);
- } else {
- alreadyVisited.add(obj);
- if (clazz.isArray()) {
- visitArray(obj);
- } else {
- classSizeInfos.getUnchecked(clazz).visit(obj, this);
- }
- }
- }
-
- private void visitArray(Object array) {
- final Class<?> componentType = array.getClass().getComponentType();
- final int length = Array.getLength(array);
- if (componentType.isPrimitive()) {
- increaseByArraySize(length, getPrimitiveFieldSize(componentType));
- } else {
- increaseByArraySize(length, referenceSize);
- // If we didn't use an ArrayElementsVisitor, we would be enqueueing every
- // element of the array here instead. For large arrays, it would
- // tremendously enlarge the queue. In essence, we're compressing it into
- // a small command object instead. This is different than immediately
- // visiting the elements, as their visiting is scheduled for the end of
- // the current queue.
- switch (length) {
- case 0: {
- break;
- }
- case 1: {
- enqueue(Array.get(array, 0));
- break;
- }
- default: {
- enqueue(new ArrayElementsVisitor((Object[]) array));
- }
- }
- }
- }
-
- private void increaseByArraySize(int length, long elementSize) {
- increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
- }
-
- private static class ArrayElementsVisitor {
- private final Object[] array;
-
- ArrayElementsVisitor(Object[] array) {
- this.array = array;
- }
-
- public void visit(ObjectSizeCalculator calc) {
- for (Object elem : array) {
- if (elem != null) {
- calc.visit(elem);
- }
- }
- }
- }
-
- void enqueue(Object obj) {
- if (obj != null) {
- pending.addLast(obj);
- }
- }
-
- void increaseSize(long objectSize) {
- size += objectSize;
- }
-
- @VisibleForTesting
- static long roundTo(long x, int multiple) {
- return ((x + multiple - 1) / multiple) * multiple;
- }
-
- private class ClassSizeInfo {
- // Padded fields + header size
- private final long objectSize;
- // Only the fields size - used to calculate the subclasses' memory
- // footprint.
- private final long fieldsSize;
- private final Field[] referenceFields;
-
- public ClassSizeInfo(Class<?> clazz) {
- long fieldsSize = 0;
- final List<Field> referenceFields = new LinkedList<Field>();
- for (Field f : clazz.getDeclaredFields()) {
- if (Modifier.isStatic(f.getModifiers())) {
- continue;
- }
- final Class<?> type = f.getType();
- if (type.isPrimitive()) {
- fieldsSize += getPrimitiveFieldSize(type);
- } else {
- f.setAccessible(true);
- referenceFields.add(f);
- fieldsSize += referenceSize;
- }
- }
- final Class<?> superClass = clazz.getSuperclass();
- if (superClass != null) {
- final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass);
- fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
- referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
- }
- this.fieldsSize = fieldsSize;
- this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
- this.referenceFields = referenceFields.toArray(
- new Field[referenceFields.size()]);
- }
-
- void visit(Object obj, ObjectSizeCalculator calc) {
- calc.increaseSize(objectSize);
- enqueueReferencedObjects(obj, calc);
- }
-
- public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
- for (Field f : referenceFields) {
- try {
- calc.enqueue(f.get(obj));
- } catch (IllegalAccessException e) {
- final AssertionError ae = new AssertionError(
- "Unexpected denial of access to " + f);
- ae.initCause(e);
- throw ae;
- }
- }
- }
- }
-
- private static long getPrimitiveFieldSize(Class<?> type) {
- if (type == boolean.class || type == byte.class) {
- return 1;
- }
- if (type == char.class || type == short.class) {
- return 2;
- }
- if (type == int.class || type == float.class) {
- return 4;
- }
- if (type == long.class || type == double.class) {
- return 8;
- }
- throw new AssertionError("Encountered unexpected primitive type " +
- type.getName());
- }
-
- @VisibleForTesting
- static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
- final String vmName = System.getProperty("java.vm.name");
- if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ")
- || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) {
- throw new UnsupportedOperationException(
- "ObjectSizeCalculator only supported on HotSpot VM");
- }
-
- final String dataModel = System.getProperty("sun.arch.data.model");
- if ("32".equals(dataModel)) {
- // Running with 32-bit data model
- return new MemoryLayoutSpecification() {
- @Override public int getArrayHeaderSize() {
- return 12;
- }
- @Override public int getObjectHeaderSize() {
- return 8;
- }
- @Override public int getObjectPadding() {
- return 8;
- }
- @Override public int getReferenceSize() {
- return 4;
- }
- @Override public int getSuperclassFieldPadding() {
- return 4;
- }
- };
- } else if (!"64".equals(dataModel)) {
- throw new UnsupportedOperationException("Unrecognized value '" +
- dataModel + "' of sun.arch.data.model system property");
- }
-
- final String strVmVersion = System.getProperty("java.vm.version");
- final int vmVersion = Integer.parseInt(strVmVersion.substring(0,
- strVmVersion.indexOf('.')));
- if (vmVersion >= 17) {
- long maxMemory = 0;
- for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
- maxMemory += mp.getUsage().getMax();
- }
- if (maxMemory < 30L * 1024 * 1024 * 1024) {
- // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
- // for all memory pools (yes, including code cache).
- return new MemoryLayoutSpecification() {
- @Override public int getArrayHeaderSize() {
- return 16;
- }
- @Override public int getObjectHeaderSize() {
- return 12;
- }
- @Override public int getObjectPadding() {
- return 8;
- }
- @Override public int getReferenceSize() {
- return 4;
- }
- @Override public int getSuperclassFieldPadding() {
- return 4;
- }
- };
- }
- }
-
- // In other cases, it's a 64-bit uncompressed OOPs object model
- return new MemoryLayoutSpecification() {
- @Override public int getArrayHeaderSize() {
- return 24;
- }
- @Override public int getObjectHeaderSize() {
- return 16;
- }
- @Override public int getObjectPadding() {
- return 8;
- }
- @Override public int getReferenceSize() {
- return 8;
- }
- @Override public int getSuperclassFieldPadding() {
- return 8;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
deleted file mode 100644
index cfbf04e..0000000
--- a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.stats;
-
-import java.util.Arrays;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-
-/**
- * <p>
- * Implements Histogram structure for computing approximate quantiles.
- * The implementation is based on the following paper:
- *
- * <pre>
- * [MP80] Munro & Paterson, "Selection and Sorting with Limited Storage",
- * Theoretical Computer Science, Vol 12, p 315-323, 1980.
- * </pre>
- * </p>
- * <p>
- * You could read a detailed description of the same algorithm here:
- *
- * <pre>
- * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
- * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM
- * SIGMOD, Vol 27, No 2, p 426-435, June 1998.
- * </pre>
- * </p>
- * <p>
- * There's a good explanation of the algorithm in the Sawzall source code
- * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc
- * </p>
- * Here's a schema of the tree:
- * <pre>
- * [4] level 3, weight=rootWeight=8
- * |
- * [3] level 2, weight=4
- * |
- * [2] level 1, weight=2
- * / \
- * [0] [1] level 0, weight=1
- * </pre>
- * <p>
- * {@code [i]} represents {@code buffer[i]}
- * The depth of the tree is limited to a maximum value
- * Every buffer has the same size
- * </p>
- * <p>
- * We add element in {@code [0]} or {@code [1]}.
- * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer
- * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise
- * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and
- * so on...
- * </p>
- */
-public final class ApproximateHistogram implements Histogram {
- @VisibleForTesting
- public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000);
- @VisibleForTesting
- public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB);
- @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long
-
- // See above
- @VisibleForTesting long[][] buffer;
- @VisibleForTesting long count = 0L;
- @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves
- @VisibleForTesting int currentTop = 1;
- @VisibleForTesting int[] indices; // member for optimization reason
- private boolean leavesSorted = true;
- private int rootWeight = 1;
- private long[][] bufferPool; // pool of 2 buffers (used for merging)
- private int bufferSize;
- private int maxDepth;
-
- /**
- * Private init method that is called only by constructors.
- * All allocations are done in this method.
- *
- * @param bufSize size of each buffer
- * @param depth maximum depth of the tree of buffers
- */
- @VisibleForTesting
- void init(int bufSize, int depth) {
- bufferSize = bufSize;
- maxDepth = depth;
- bufferPool = new long[2][bufferSize];
- indices = new int[depth + 1];
- buffer = new long[depth + 1][bufferSize];
- // only allocate the first 2 buffers, lazily allocate the others.
- allocate(0);
- allocate(1);
- Arrays.fill(buffer, 2, buffer.length, null);
- clear();
- }
-
- @VisibleForTesting
- ApproximateHistogram(int bufSize, int depth) {
- init(bufSize, depth);
- }
-
- /**
- * Constructor with precision constraint, it will allocated as much memory as require to match
- * this precision constraint.
- * @param precision the requested precision
- */
- public ApproximateHistogram(Precision precision) {
- Preconditions.checkNotNull(precision);
- int depth = computeDepth(precision.getEpsilon(), precision.getN());
- int bufSize = computeBufferSize(depth, precision.getN());
- init(bufSize, depth);
- }
-
- /**
- * Constructor with memory constraint, it will find the best possible precision that satisfied
- * the memory constraint.
- * @param maxMemory the maximum amount of memory that the instance will take
- */
- public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) {
- Preconditions.checkNotNull(maxMemory);
- Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES),
- "at least 1KB is required for an Histogram");
-
- double epsilon = DEFAULT_PRECISION.getEpsilon();
- int n = expectedSize;
- int depth = computeDepth(epsilon, n);
- int bufSize = computeBufferSize(depth, n);
- long maxBytes = maxMemory.as(Data.BYTES);
-
- // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps)
- boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes;
- double multiplier = tooMuchMem ? 1.05 : 0.95;
- while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) {
- epsilon *= multiplier;
- if (epsilon < 0.00001) {
- // for very high memory constraint increase N as well
- n *= 10;
- epsilon = DEFAULT_PRECISION.getEpsilon();
- }
- depth = computeDepth(epsilon, n);
- bufSize = computeBufferSize(depth, n);
- }
- if (!tooMuchMem) {
- // It's ok to consume less memory than the constraint
- // but we never have to consume more!
- depth = computeDepth(epsilon / multiplier, n);
- bufSize = computeBufferSize(depth, n);
- }
-
- init(bufSize, depth);
- }
-
- /**
- * Constructor with memory constraint.
- * @see #ApproximateHistogram(Amount, int)
- */
- public ApproximateHistogram(Amount<Long, Data> maxMemory) {
- this(maxMemory, DEFAULT_PRECISION.getN());
- }
-
- /**
- * Default Constructor.
- * @see #ApproximateHistogram(Amount)
- */
- public ApproximateHistogram() {
- this(DEFAULT_MAX_MEMORY);
- }
-
- @Override
- public synchronized void add(long x) {
- // if the leaves of the tree are full, "collapse" recursively the tree
- if (leafCount == 2 * bufferSize) {
- Arrays.sort(buffer[0]);
- Arrays.sort(buffer[1]);
- recCollapse(buffer[0], 1);
- leafCount = 0;
- }
-
- // Now we're sure there is space for adding x
- if (leafCount < bufferSize) {
- buffer[0][leafCount] = x;
- } else {
- buffer[1][leafCount - bufferSize] = x;
- }
- leafCount++;
- count++;
- leavesSorted = (leafCount == 1);
- }
-
- @Override
- public synchronized long getQuantile(double q) {
- Preconditions.checkArgument(0.0 <= q && q <= 1.0,
- "quantile must be in the range 0.0 to 1.0 inclusive");
- if (count == 0) {
- return 0L;
- }
-
- // the two leaves are the only buffer that can be partially filled
- int buf0Size = Math.min(bufferSize, leafCount);
- int buf1Size = Math.max(0, leafCount - buf0Size);
- long sum = 0;
- long target = (long) Math.ceil(count * (1.0 - q));
- int i;
-
- if (! leavesSorted) {
- Arrays.sort(buffer[0], 0, buf0Size);
- Arrays.sort(buffer[1], 0, buf1Size);
- leavesSorted = true;
- }
- Arrays.fill(indices, bufferSize - 1);
- indices[0] = buf0Size - 1;
- indices[1] = buf1Size - 1;
-
- do {
- i = biggest(indices);
- indices[i]--;
- sum += weight(i);
- } while (sum < target);
- return buffer[i][indices[i] + 1];
- }
-
- @Override
- public synchronized long[] getQuantiles(double[] quantiles) {
- return Histograms.extractQuantiles(this, quantiles);
- }
-
- @Override
- public synchronized void clear() {
- count = 0L;
- leafCount = 0;
- currentTop = 1;
- rootWeight = 1;
- leavesSorted = true;
- }
-
- /**
- * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the
- * underlying histograms as it was just one.
- * Note: Should only be used for querying the underlying histograms.
- */
- private static class MergedHistogram implements Histogram {
- private final ApproximateHistogram[] histograms;
-
- private MergedHistogram(ApproximateHistogram[] histograms) {
- this.histograms = histograms;
- }
-
- @Override
- public void add(long x) {
- /* Ignore, Shouldn't be used */
- assert(false);
- }
-
- @Override
- public void clear() {
- /* Ignore, Shouldn't be used */
- assert(false);
- }
-
- @Override
- public long getQuantile(double quantile) {
- Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0,
- "quantile must be in the range 0.0 to 1.0 inclusive");
-
- long count = initIndices();
- if (count == 0) {
- return 0L;
- }
-
- long sum = 0;
- long target = (long) Math.ceil(count * (1.0 - quantile));
- int iHist = -1;
- int iBiggest = -1;
- do {
- long biggest = Long.MIN_VALUE;
- for (int i = 0; i < histograms.length; i++) {
- ApproximateHistogram hist = histograms[i];
- int indexBiggest = hist.biggest(hist.indices);
- if (indexBiggest >= 0) {
- long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]];
- if (iBiggest == -1 || biggest <= value) {
- iBiggest = indexBiggest;
- biggest = value;
- iHist = i;
- }
- }
- }
- histograms[iHist].indices[iBiggest]--;
- sum += histograms[iHist].weight(iBiggest);
- } while (sum < target);
-
- ApproximateHistogram hist = histograms[iHist];
- int i = hist.indices[iBiggest];
- return hist.buffer[iBiggest][i + 1];
- }
-
- @Override
- public synchronized long[] getQuantiles(double[] quantiles) {
- return Histograms.extractQuantiles(this, quantiles);
- }
-
- /**
- * Initialize the indices array for each Histogram and return the global count.
- */
- private long initIndices() {
- long count = 0L;
- for (int i = 0; i < histograms.length; i++) {
- ApproximateHistogram h = histograms[i];
- int[] indices = h.indices;
- count += h.count;
- int buf0Size = Math.min(h.bufferSize, h.leafCount);
- int buf1Size = Math.max(0, h.leafCount - buf0Size);
-
- if (! h.leavesSorted) {
- Arrays.sort(h.buffer[0], 0, buf0Size);
- Arrays.sort(h.buffer[1], 0, buf1Size);
- h.leavesSorted = true;
- }
- Arrays.fill(indices, h.bufferSize - 1);
- indices[0] = buf0Size - 1;
- indices[1] = buf1Size - 1;
- }
- return count;
- }
- }
-
- /**
- * Return a MergedHistogram
- * @param histograms array of histograms to merged together
- * @return a new Histogram
- */
- public static Histogram merge(ApproximateHistogram[] histograms) {
- return new MergedHistogram(histograms);
- }
-
- /**
- * We compute the "smallest possible b" satisfying two inequalities:
- * 1) (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N
- * 2) k * (2 ^ (b - 1)) >= N
- *
- * For an explanation of these inequalities, please read the Munro-Paterson or
- * the Manku-Rajagopalan-Linday papers.
- */
- @VisibleForTesting static int computeDepth(double epsilon, long n) {
- int b = 2;
- while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) {
- b += 1;
- }
- return b;
- }
-
- @VisibleForTesting static int computeBufferSize(int depth, long n) {
- return (int) (n / (1L << (depth - 1)));
- }
-
- /**
- * Return an estimation of the memory used by an instance.
- * The size is due to:
- * - a fix cost (76 bytes) for the class + fields
- * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE)
- * - indices: 16 + sizeof(Integer) * (depth + 1)
- * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE)
- *
- * Note: This method is tested with unit test, it will break if you had new fields.
- * @param bufferSize the size of a buffer
- * @param depth the depth of the tree of buffer (depth + 1 buffers)
- */
- @VisibleForTesting
- static long memoryUsage(int bufferSize, int depth) {
- return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3));
- }
-
- /**
- * Return the level of the biggest element (using the indices array 'ids'
- * to track which elements have been already returned). Every buffer has
- * already been sorted at this point.
- * @return the level of the biggest element or -1 if no element has been found
- */
- @VisibleForTesting
- int biggest(final int[] ids) {
- long biggest = Long.MIN_VALUE;
- final int id0 = ids[0], id1 = ids[1];
- int iBiggest = -1;
-
- if (0 < leafCount && 0 <= id0) {
- biggest = buffer[0][id0];
- iBiggest = 0;
- }
- if (bufferSize < leafCount && 0 <= id1) {
- long x = buffer[1][id1];
- if (x > biggest) {
- biggest = x;
- iBiggest = 1;
- }
- }
- for (int i = 2; i < currentTop + 1; i++) {
- if (!isBufferEmpty(i) && 0 <= ids[i]) {
- long x = buffer[i][ids[i]];
- if (x > biggest) {
- biggest = x;
- iBiggest = i;
- }
- }
- }
- return iBiggest;
- }
-
-
- /**
- * Based on the number of elements inserted we can easily know if a buffer
- * is empty or not
- */
- @VisibleForTesting
- boolean isBufferEmpty(int level) {
- if (level == currentTop) {
- return false; // root buffer (if present) is always full
- } else {
- long levelWeight = 1 << (level - 1);
- return (((count - leafCount) / bufferSize) & levelWeight) == 0;
- }
- }
-
- /**
- * Return the weight of the level ie. 2^(i-1) except for the two tree
- * leaves (weight=1) and for the root
- */
- private int weight(int level) {
- if (level == 0) {
- return 1;
- } else if (level == maxDepth) {
- return rootWeight;
- } else {
- return 1 << (level - 1);
- }
- }
-
- private void allocate(int i) {
- if (buffer[i] == null) {
- buffer[i] = new long[bufferSize];
- }
- }
-
- /**
- * Recursively collapse the buffers of the tree.
- * Upper buffers will be allocated on first access in this method.
- */
- private void recCollapse(long[] buf, int level) {
- // if we reach the root, we can't add more buffer
- if (level == maxDepth) {
- // weight() return the weight of the root, in that case we need the
- // weight of merge result
- int mergeWeight = 1 << (level - 1);
- int idx = level % 2;
- long[] merged = bufferPool[idx];
- long[] tmp = buffer[level];
- collapse(buf, mergeWeight, buffer[level], rootWeight, merged);
- buffer[level] = merged;
- bufferPool[idx] = tmp;
- rootWeight += mergeWeight;
- } else {
- allocate(level + 1); // lazy allocation (if needed)
- if (level == currentTop) {
- // if we reach the top, add a new buffer
- collapse1(buf, buffer[level], buffer[level + 1]);
- currentTop += 1;
- rootWeight *= 2;
- } else if (isBufferEmpty(level + 1)) {
- // if the upper buffer is empty, use it
- collapse1(buf, buffer[level], buffer[level + 1]);
- } else {
- // it the upper buffer isn't empty, collapse with it
- long[] merged = bufferPool[level % 2];
- collapse1(buf, buffer[level], merged);
- recCollapse(merged, level + 1);
- }
- }
- }
-
- /**
- * collapse two sorted Arrays of different weight
- * ex: [2,5,7] weight 2 and [3,8,9] weight 3
- * weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9]
- * sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9]
- * select every nth elems = [3,7,9] (n = sum weight / 2)
- */
- @VisibleForTesting
- static void collapse(
- long[] left,
- int leftWeight,
- long[] right,
- int rightWeight,
- long[] output) {
-
- int totalWeight = leftWeight + rightWeight;
- int halfTotalWeight = (totalWeight / 2) - 1;
- int i = 0, j = 0, k = 0, cnt = 0;
-
- int weight;
- long smallest;
-
- while (i < left.length || j < right.length) {
- if (i < left.length && (j == right.length || left[i] < right[j])) {
- smallest = left[i];
- weight = leftWeight;
- i++;
- } else {
- smallest = right[j];
- weight = rightWeight;
- j++;
- }
-
- int cur = (cnt + halfTotalWeight) / totalWeight;
- cnt += weight;
- int next = (cnt + halfTotalWeight) / totalWeight;
-
- for(; cur < next; cur++) {
- output[k] = smallest;
- k++;
- }
- }
- }
-
-/**
- * Optimized version of collapse for collapsing two array of the same weight
- * (which is what we want most of the time)
- */
- private static void collapse1(
- long[] left,
- long[] right,
- long[] output) {
-
- int i = 0, j = 0, k = 0, cnt = 0;
- long smallest;
-
- while (i < left.length || j < right.length) {
- if (i < left.length && (j == right.length || left[i] < right[j])) {
- smallest = left[i];
- i++;
- } else {
- smallest = right[j];
- j++;
- }
- if (cnt % 2 == 1) {
- output[k] = smallest;
- k++;
- }
- cnt++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
deleted file mode 100644
index 024e67b..0000000
--- a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.stats;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-
-/**
- * A map from a key type to integers. This simplifies the process of storing counters for multiple
- * values of the same type.
- */
-public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable {
- private final Map<K, Integer> map = Maps.newHashMap();
-
- private static Logger log = Logger.getLogger(CounterMap.class.getName());
-
- /**
- * Increments the counter value associated with {@code key}, and returns the new value.
- *
- * @param key The key to increment
- * @return The incremented value.
- */
- public int incrementAndGet(K key) {
- return incrementAndGet(key, 1);
- }
-
- /**
- * Increments the value associated with {@code key} by {@code value}, returning the new value.
- *
- * @param key The key to increment
- * @return The incremented value.
- */
- public int incrementAndGet(K key, int count) {
- Integer value = map.get(key);
- if (value == null) {
- value = 0;
- }
- int newValue = count + value;
- map.put(key, newValue);
- return newValue;
- }
-
- /**
- * Gets the value associated with a key.
- *
- * @param key The key to look up.
- * @return The counter value stored for {@code key}, or 0 if no mapping exists.
- */
- public int get(K key) {
- if (!map.containsKey(key)) {
- return 0;
- }
-
- return map.get(key);
- }
-
- /**
- * Assigns a value to a key.
- *
- * @param key The key to assign a value to.
- * @param newValue The value to assign.
- */
- public void set(K key, int newValue) {
- Preconditions.checkNotNull(key);
- map.put(key, newValue);
- }
-
- /**
- * Resets the value for {@code key}. This will remove the key from the counter.
- *
- * @param key The key to reset.
- */
- public void reset(K key) {
- map.remove(key);
- }
-
- /**
- * Gets the number of entries stored in the map.
- *
- * @return The size of the map.
- */
- public int size() {
- return map.size();
- }
-
- /**
- * Gets an iterator for the mapped values.
- *
- * @return Iterator for mapped values.
- */
- public Iterator<Map.Entry<K, Integer>> iterator() {
- return map.entrySet().iterator();
- }
-
- public Collection<Integer> values() {
- return map.values();
- }
-
- public Set<K> keySet() {
- return map.keySet();
- }
-
- public String toString() {
- StringBuilder strVal = new StringBuilder();
- for (Map.Entry<K, Integer> entry : this) {
- strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n');
- }
- return strVal.toString();
- }
-
- public Map<K, Integer> toMap() {
- return map;
- }
-
- @Override
- public CounterMap<K> clone() {
- CounterMap<K> newInstance = new CounterMap<K>();
- newInstance.map.putAll(map);
- return newInstance;
- }
-}