You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:31 UTC
[41/51] [partial] aurora git commit: Move packages from
com.twitter.common to org.apache.aurora.common
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java b/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java
deleted file mode 100644
index 9b76147..0000000
--- a/commons/src/main/java/com/twitter/common/net/http/handlers/TimeSeriesDataSource.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.http.handlers;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-
-import javax.annotation.Nullable;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.net.MediaType;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.inject.Inject;
-
-import com.twitter.common.collections.Iterables2;
-import com.twitter.common.stats.TimeSeries;
-import com.twitter.common.stats.TimeSeriesRepository;
-
-/**
- * A servlet that provides time series data in JSON format.
- */
-public class TimeSeriesDataSource extends HttpServlet {
-
- @VisibleForTesting static final String TIME_METRIC = "time";
-
- private static final String METRICS = "metrics";
- private static final String SINCE = "since";
-
- private final TimeSeriesRepository timeSeriesRepo;
- private final Gson gson = new Gson();
-
- @Inject
- public TimeSeriesDataSource(TimeSeriesRepository timeSeriesRepo) {
- this.timeSeriesRepo = Preconditions.checkNotNull(timeSeriesRepo);
- }
-
- @VisibleForTesting
- String getResponse(
- @Nullable String metricsQuery,
- @Nullable String sinceQuery) throws MetricException {
-
- if (metricsQuery == null) {
- // Return metric listing.
- return gson.toJson(ImmutableList.copyOf(timeSeriesRepo.getAvailableSeries()));
- }
-
- List<Iterable<Number>> tsData = Lists.newArrayList();
- tsData.add(timeSeriesRepo.getTimestamps());
- // Ignore requests for "time" since it is implicitly returned.
- Iterable<String> names = Iterables.filter(
- Splitter.on(",").split(metricsQuery),
- Predicates.not(Predicates.equalTo(TIME_METRIC)));
- for (String metric : names) {
- TimeSeries series = timeSeriesRepo.get(metric);
- if (series == null) {
- JsonObject response = new JsonObject();
- response.addProperty("error", "Unknown metric " + metric);
- throw new MetricException(gson.toJson(response));
- }
- tsData.add(series.getSamples());
- }
-
- final long since = Long.parseLong(Optional.fromNullable(sinceQuery).or("0"));
- Predicate<List<Number>> sinceFilter = new Predicate<List<Number>>() {
- @Override public boolean apply(List<Number> next) {
- return next.get(0).longValue() > since;
- }
- };
-
- ResponseStruct response = new ResponseStruct(
- ImmutableList.<String>builder().add(TIME_METRIC).addAll(names).build(),
- FluentIterable.from(Iterables2.zip(tsData, 0)).filter(sinceFilter).toList());
- return gson.toJson(response);
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
-
- resp.setContentType(MediaType.JSON_UTF_8.toString());
- PrintWriter out = resp.getWriter();
- try {
- out.write(getResponse(req.getParameter(METRICS), req.getParameter(SINCE)));
- } catch (MetricException e) {
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- out.write(e.getMessage());
- }
- }
-
- @VisibleForTesting
- static class ResponseStruct {
- // Fields must be non-final for deserialization.
- List<String> names;
- List<List<Number>> data;
-
- ResponseStruct(List<String> names, List<List<Number>> data) {
- this.names = names;
- this.data = data;
- }
- }
-
- @VisibleForTesting
- static class MetricException extends Exception {
- MetricException(String message) {
- super(message);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java b/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java
deleted file mode 100644
index b9253c8..0000000
--- a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.http.handlers;
-
-import java.util.Collections;
-import java.util.List;
-
-import javax.servlet.http.HttpServletRequest;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-
-import com.twitter.common.stats.Stat;
-
-/**
- * HTTP handler that prints all registered variables and their current values.
- *
- * @author William Farner
- */
-public class VarsHandler extends TextResponseHandler {
-
- private static final Function<Stat, String> VAR_PRINTER = new Function<Stat, String>() {
- @Override public String apply(Stat stat) {
- return stat.getName() + " " + stat.read();
- }
- };
-
- private final Supplier<Iterable<Stat<?>>> statSupplier;
-
- /**
- * Creates a new handler that will report stats from the provided supplier.
- *
- * @param statSupplier Stats supplier.
- */
- @Inject
- public VarsHandler(Supplier<Iterable<Stat<?>>> statSupplier) {
- this.statSupplier = Preconditions.checkNotNull(statSupplier);
- }
-
- @Override
- public Iterable<String> getLines(HttpServletRequest request) {
- List<String> lines = Lists.newArrayList(Iterables.transform(statSupplier.get(), VAR_PRINTER));
- Collections.sort(lines);
- return lines;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java b/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java
deleted file mode 100644
index a6c105d..0000000
--- a/commons/src/main/java/com/twitter/common/net/http/handlers/VarsJsonHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.http.handlers;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Map;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Inject;
-
-import com.twitter.common.stats.Stat;
-
-/**
- * A servlet that returns the current value of all variables in JSON format.
- * The format returns a JSON object with string fields and typed values:
- * <pre>
- * {
- * "var_a": 1,
- * "var_b": 126.0,
- * "var_c": "a string value",
- * }
- * </pre>
- * If the optional URL parameter 'pretty' is used, the output will be pretty-printed
- * (similar to the above example).
- *
- * @author William Farner
- */
-public class VarsJsonHandler extends HttpServlet {
-
- private final Supplier<Iterable<Stat<?>>> statSupplier;
-
- /**
- * Creates a new handler that will report stats from the provided supplier.
- *
- * @param statSupplier Stats supplier.
- */
- @Inject
- public VarsJsonHandler(Supplier<Iterable<Stat<?>>> statSupplier) {
- this.statSupplier = Preconditions.checkNotNull(statSupplier);
- }
-
- @VisibleForTesting
- String getBody(boolean pretty) {
- Map<String, Object> vars = Maps.newLinkedHashMap();
- for (Stat<?> var : statSupplier.get()) {
- vars.put(var.getName(), var.read());
- }
- return getGson(pretty).toJson(vars);
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
-
- resp.setContentType("application/json");
- resp.setStatus(HttpServletResponse.SC_OK);
- PrintWriter responseBody = resp.getWriter();
- try {
- responseBody.print(getBody(req.getParameter("pretty") != null));
- } finally {
- responseBody.close();
- }
- }
-
- private Gson getGson(boolean pretty) {
- return pretty ? new GsonBuilder().setPrettyPrinting().create() : new Gson();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java
deleted file mode 100644
index d2e17c9..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.twitter.common.net.pool.ResourceExhaustedException;
-
-/**
- * A load balancer that attempts to direct load towards a backend that has the fewest leased
- * connections.
- *
- * @author William Farner
- */
-public class LeastConnectedStrategy<S> extends StaticLoadBalancingStrategy<S> {
- private static final Logger LOG = Logger.getLogger(LeastConnectedStrategy.class.getName());
-
- // Maps from backends to the number of connections made to them.
- private final Map<S, ConnectionStats> connections = Maps.newHashMap();
-
- // Manages sorting of connection counts, with a reference back to the backend.
- private final SortedSet<ConnectionStats> connectionStats = Sets.newTreeSet();
-
- /**
- * Encapsulates a set of connection stats that allow connections to be sorted as per the least
- * connected strategy.
- */
- private class ConnectionStats implements Comparable<ConnectionStats> {
- final S connectionKey;
- final int connectionId;
- int activeCount = 0; // Stores the total number of active connections.
- long useCount = 0; // Stores the total number times a connection has been used.
-
- ConnectionStats(S connectionKey, int connectionId) {
- this.connectionKey = connectionKey;
- this.connectionId = connectionId;
- }
-
- @Override
- public int compareTo(ConnectionStats other) {
- // Sort by number of active connections first.
- int difference = activeCount - other.activeCount;
- if (difference != 0) {
- return difference;
- }
-
- // Sub-sort by total number of times a connection has been used (this will ensure that
- // all backends are exercised).
- long useDifference = useCount - other.useCount;
- if (useDifference != 0) {
- return Long.signum(useDifference);
- }
-
- // If the above two are equal, break the tie using the connection id.
- return connectionId - other.connectionId;
- }
-
- @Override
- public boolean equals(Object o) {
- // We use ConnectionStats in a sorted container and so we need to have an equals
- // implementation consistent with compareTo, ie:
- // (x.compareTo(y) == 0) == x.equals(y)
- // We accomplish this directly.
-
- @SuppressWarnings("unchecked")
- ConnectionStats other = (ConnectionStats) o;
- return compareTo(other) == 0;
- }
-
- @Override
- public String toString() {
- return String.format("%d-%d", activeCount, useCount);
- }
- }
-
- @Override
- protected Collection<S> onBackendsOffered(Set<S> backends) {
- Map<S, ConnectionStats> newConnections = Maps.newHashMapWithExpectedSize(backends.size());
- Collection<ConnectionStats> newConnectionStats =
- Lists.newArrayListWithCapacity(backends.size());
-
- // Recreate all connection stats since their ordering may have changed and this is used for
- // comparison tie breaks.
- int backendId = 0;
- for (S backend : backends) {
- ConnectionStats stats = new ConnectionStats(backend, backendId++);
-
- // Retain the activeCount for existing backends to prevent dogpiling existing active servers
- ConnectionStats existing = connections.get(backend);
- if (existing != null) {
- stats.activeCount = existing.activeCount;
- }
-
- newConnections.put(backend, stats);
- newConnectionStats.add(stats);
- }
-
- connections.clear();
- connections.putAll(newConnections);
- connectionStats.clear();
- connectionStats.addAll(newConnectionStats);
-
- return connections.keySet();
- }
-
- @Override
- public S nextBackend() throws ResourceExhaustedException {
- Preconditions.checkState(connections.size() == connectionStats.size());
-
- if (connectionStats.isEmpty()) {
- throw new ResourceExhaustedException("No backends.");
- }
-
- return connectionStats.first().connectionKey;
- }
-
- @Override
- public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) {
- Preconditions.checkNotNull(backendKey);
- Preconditions.checkState(connections.size() == connectionStats.size());
- Preconditions.checkNotNull(result);
-
- ConnectionStats stats = connections.get(backendKey);
- Preconditions.checkNotNull(stats);
-
- Preconditions.checkState(connectionStats.remove(stats));
- if (result == ConnectionResult.SUCCESS) {
- stats.activeCount++;
- }
- stats.useCount++;
- Preconditions.checkState(connectionStats.add(stats));
- }
-
- @Override
- public void connectionReturned(S backendKey) {
- Preconditions.checkNotNull(backendKey);
- Preconditions.checkState(connections.size() == connectionStats.size());
-
- ConnectionStats stats = connections.get(backendKey);
- Preconditions.checkNotNull(stats);
-
- if (stats.activeCount > 0) {
- Preconditions.checkState(connectionStats.remove(stats));
- stats.activeCount--;
- Preconditions.checkState(connectionStats.add(stats));
- } else {
- LOG.warning("connection stats dropped below zero, ignoring");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java
deleted file mode 100644
index b514b2d..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * A load balancer, which will be used to determine which of a set of backends should be connected
- * to for service calls. It is expected that the backends themselves can be changed at any time,
- * and the load balancer should immediately restrict itself to using only those backends.
- *
- * It is likely that the load balancer implementation will periodically receive information about
- * backends that it technically should no longer know about. An example is calls to
- * {@link #requestResult(Object, RequestResult, long)} and {@link #released(Object)} for
- * in-flight requests after backends were changed by {@link #offerBackends(Set, Closure)}.
- *
- * @author William Farner
- */
-public interface LoadBalancer<K> extends RequestTracker<K> {
-
- /**
- * Offers a set of backends that the load balancer should choose from to distribute load amongst.
- *
- * @param offeredBackends Backends to choose from.
- * @param onBackendsChosen A callback that should be notified when the offered backends have been
- * (re)chosen from.
- */
- void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen);
-
- /**
- * Gets the next backend that a request should be sent to.
- *
- * @return Next backend to send a request.
- * @throws ResourceExhaustedException If there are no available backends.
- */
- K nextBackend() throws ResourceExhaustedException;
-
- /**
- * Signals the load balancer that a connection was made.
- *
- * @param backend The backend that was connected to.
- * @param connectTimeNanos The time spent waiting for the connection to be established.
- */
- void connected(K backend, long connectTimeNanos);
-
- /**
- * Signals the load balancer that a connection was attempted, but failed.
- *
- * @param backend The backend to which connection attempt was made.
- * @param result The result of the connection attempt (only FAILED and TIMEOUT are permitted).
- */
- void connectFailed(K backend, ConnectionResult result);
-
- /**
- * Signals the load balancer that a connection was released, and is idle.
- *
- * @param connection Idle connection.
- */
- void released(K connection);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java
deleted file mode 100644
index 5f94948..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancerImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import java.util.Collection;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-
-/**
- * Implementation of a load balancer, that uses a pluggable {@link LoadBalancingStrategy} to define
- * actual load balancing behavior. This class handles the responsibility of associating connections
- * with backends.
- *
- * Calls to {@link #connected(Object, long)},
- * {@link #requestResult(Object, RequestResult, long)}, and {@link #released(Object)} will not
- * be forwarded for unknown backends/connections.
- *
- * @author William Farner
- */
-public class LoadBalancerImpl<K> implements LoadBalancer<K> {
-
- private final LoadBalancingStrategy<K> strategy;
-
- private Set<K> offeredBackends = ImmutableSet.of();
-
- /**
- * Creates a new load balancer that will use the given strategy.
- *
- * @param strategy Strategy to delegate load balancing work to.
- */
- public LoadBalancerImpl(LoadBalancingStrategy<K> strategy) {
- this.strategy = Preconditions.checkNotNull(strategy);
- }
-
- @Override
- public synchronized void offerBackends(Set<K> offeredBackends,
- final Closure<Collection<K>> onBackendsChosen) {
- this.offeredBackends = ImmutableSet.copyOf(offeredBackends);
- strategy.offerBackends(offeredBackends, new Closure<Collection<K>>() {
- @Override public void execute(Collection<K> chosenBackends) {
- onBackendsChosen.execute(chosenBackends);
- }
- });
- }
-
- @Override
- public synchronized K nextBackend() throws ResourceExhaustedException {
- return strategy.nextBackend();
- }
-
- @Override
- public synchronized void connected(K backend, long connectTimeNanos) {
- Preconditions.checkNotNull(backend);
-
- if (!hasBackend(backend)) return;
-
- strategy.addConnectResult(backend, ConnectionResult.SUCCESS, connectTimeNanos);
- }
-
- private boolean hasBackend(K backend) {
- return offeredBackends.contains(backend);
- }
-
- @Override
- public synchronized void connectFailed(K backend, ConnectionResult result) {
- Preconditions.checkNotNull(backend);
- Preconditions.checkNotNull(result);
- Preconditions.checkArgument(result != ConnectionResult.SUCCESS);
-
- if (!hasBackend(backend)) return;
-
- strategy.addConnectResult(backend, result, 0);
- }
-
- @Override
- public synchronized void released(K backend) {
- Preconditions.checkNotNull(backend);
-
- if (!hasBackend(backend)) return;
-
- strategy.connectionReturned(backend);
- }
-
- @Override
- public synchronized void requestResult(K backend, RequestResult result, long requestTimeNanos) {
- Preconditions.checkNotNull(backend);
- Preconditions.checkNotNull(result);
-
- if (!hasBackend(backend)) return;
-
- strategy.addRequestResult(backend, result, requestTimeNanos);
- }
-
- /**
- * Convenience method to create a new load balancer.
- *
- * @param strategy Strategy to use.
- * @param <K> Backend type.
- * @return A new load balancer.
- */
- public static <K> LoadBalancerImpl<K>
- create(LoadBalancingStrategy<K> strategy) {
- return new LoadBalancerImpl<K>(strategy);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java
deleted file mode 100644
index 08cb9b5..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/LoadBalancingStrategy.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * A strategy for balancing request load among backends.
- *
- * Strategies should be externally synchronized, and therefore do not have to worry about reentrant
- * access.
- *
- * @author William Farner
- */
-public interface LoadBalancingStrategy<K> {
-
- /**
- * Offers a set of backends that the load balancer should choose from to distribute load amongst.
- *
- * @param offeredBackends Backends to choose from.
- * @param onBackendsChosen A callback that should be notified when the offered backends have been
- * (re)chosen from.
- */
- public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen);
-
- /**
- * Gets the next backend that a request should be sent to.
- *
- * @return Next backend to send a request.
- * @throws ResourceExhaustedException If there are no available backends.
- */
- public K nextBackend() throws ResourceExhaustedException;
-
- /**
- * Offers information about a connection result.
- *
- * @param key Backend key.
- * @param result Connection result.
- * @param connectTimeNanos Time spent waiting for connection to be established.
- */
- public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos);
-
- /**
- * Offers information about a connection that was returned.
- *
- * @param key Backend key.
- */
- public void connectionReturned(K key);
-
- /**
- * Offers information about a request result.
- *
- * @param key Backend key.
- * @param result Request result.
- * @param requestTimeNanos Time spent waiting for a connection to be established.
- */
- public void addRequestResult(K key, RequestResult result, long requestTimeNanos);
-
- enum ConnectionResult {
- FAILED,
- TIMEOUT,
- SUCCESS
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java
deleted file mode 100644
index 19b7703..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategy.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult;
-import com.twitter.common.util.BackoffDecider;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Logger;
-
-/**
- * A load balancer that serves as a layer above another load balancer to mark hosts as dead, and
- * prevent them from being visible to the wrapped load balancer.
- * If all backends become marked as dead, they will all be unmarked.
- *
- * @author William Farner
- */
-public class MarkDeadStrategy<S> implements LoadBalancingStrategy<S> {
- private static final Logger LOG = Logger.getLogger(MarkDeadStrategy.class.getName());
-
- private final LoadBalancingStrategy<S> wrappedStrategy;
- private final Map<S, BackoffDecider> targets = Maps.newHashMap();
- private final Function<S, BackoffDecider> backoffFactory;
- protected final Predicate<S> hostChecker;
-
- private Set<S> liveBackends = null;
- private Closure<Collection<S>> onBackendsChosen = null;
-
- // Flipped when we are in "forced live" mode, where all backends are considered dead and we
- // send them all traffic as a last-ditch effort.
- private boolean forcedLive = false;
-
- /**
- * Creates a mark dead strategy with a wrapped strategy, backoff decider factory
- * and a predicate host checker. Use this constructor if you want to pass in the
- * your own implementation of the host checker.
- *
- * @param wrappedStrategy one of the implementations of the load balancing strategy.
- * @param backoffFactory backoff decider factory per host.
- * @param hostChecker predicate that returns {@code true} if the host is alive, otherwise returns {@code false}.
- */
- public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy,
- Function<S, BackoffDecider> backoffFactory, Predicate<S> hostChecker) {
- this.wrappedStrategy = Preconditions.checkNotNull(wrappedStrategy);
- this.backoffFactory = Preconditions.checkNotNull(backoffFactory);
- this.hostChecker = Preconditions.checkNotNull(hostChecker);
- }
-
- /**
- * Constructor that uses a default predicate host checker that always returns true.
- * This is the default constructor that all consumers of MarkDeadStrategy currently use.
- *
- * @param wrappedStrategy one of the implementations of the load balancing strategy.
- * @param backoffFactory backoff decider factory per host.
- */
- public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy,
- Function<S, BackoffDecider> backoffFactory) {
- this(wrappedStrategy, backoffFactory, Predicates.<S>alwaysTrue());
- }
-
- @Override
- public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) {
- this.onBackendsChosen = onBackendsChosen;
- targets.keySet().retainAll(offeredBackends);
- for (S backend : offeredBackends) {
- if (!targets.containsKey(backend)) {
- targets.put(backend, backoffFactory.apply(backend));
- }
- }
-
- adjustBackends();
- }
-
- @Override
- public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) {
- Preconditions.checkNotNull(backendKey);
- Preconditions.checkNotNull(result);
-
- BackoffDecider decider = targets.get(backendKey);
- Preconditions.checkNotNull(decider);
-
- addResult(decider, result);
- if (shouldNotifyFor(backendKey)) {
- wrappedStrategy.addConnectResult(backendKey, result, connectTimeNanos);
- }
- }
-
- @Override
- public void connectionReturned(S backendKey) {
- Preconditions.checkNotNull(backendKey);
-
- if (shouldNotifyFor(backendKey)) {
- wrappedStrategy.connectionReturned(backendKey);
- }
- }
-
- @Override
- public void addRequestResult(S requestKey, RequestResult result,
- long requestTimeNanos) {
- Preconditions.checkNotNull(requestKey);
- Preconditions.checkNotNull(result);
-
- BackoffDecider decider = targets.get(requestKey);
- Preconditions.checkNotNull(decider);
-
- addResult(decider, result);
- if (shouldNotifyFor(requestKey)) {
- wrappedStrategy.addRequestResult(requestKey, result, requestTimeNanos);
- }
- }
-
- private void addResult(BackoffDecider decider, ConnectionResult result) {
- switch (result) {
- case FAILED:
- case TIMEOUT:
- addResult(decider, false);
- break;
- case SUCCESS:
- addResult(decider, true);
- break;
- default:
- throw new UnsupportedOperationException("Unhandled result type " + result);
- }
- }
-
- private void addResult(BackoffDecider decider, RequestTracker.RequestResult result) {
- switch (result) {
- case FAILED:
- case TIMEOUT:
- addResult(decider, false);
- break;
- case SUCCESS:
- addResult(decider, true);
- break;
- default:
- throw new UnsupportedOperationException("Unhandled result type " + result);
- }
- }
-
- private void addResult(BackoffDecider decider, boolean success) {
- if (success) {
- decider.addSuccess();
- } else {
- decider.addFailure();
- }
-
- // Check if any of the backends have moved into or out of dead state.
- for (Map.Entry<S, BackoffDecider> entry : targets.entrySet()) {
- boolean dead = entry.getValue().shouldBackOff();
- boolean markedDead = !liveBackends.contains(entry.getKey());
-
- // only check the servers that were marked dead before and see if we can
- // connect to them, otherwise set dead to true.
- if (markedDead && !dead) {
- boolean alive = hostChecker.apply(entry.getKey());
- if (!alive) {
- entry.getValue().transitionToBackOff(0, true);
- }
- dead = !alive;
- }
-
- if (dead && !markedDead && forcedLive) {
- // Do nothing here. Since we have forced all backends to be live, we don't want to
- // continually advertise the backend list to the wrapped strategy.
- } else if (dead != markedDead || !dead && forcedLive) {
- adjustBackends();
- break;
- }
- }
- }
-
- private boolean shouldNotifyFor(S backend) {
- return liveBackends.contains(backend);
- }
-
- private final Predicate<S> deadTargetFilter = new Predicate<S>() {
- @Override public boolean apply(S backend) {
- return !targets.get(backend).shouldBackOff();
- }
- };
-
- private void adjustBackends() {
- liveBackends = Sets.newHashSet(Iterables.filter(targets.keySet(), deadTargetFilter));
- if (liveBackends.isEmpty()) {
- liveBackends = targets.keySet();
- forcedLive = true;
- } else {
- forcedLive = false;
- }
- LOG.info("Observed backend state change, changing live backends to " + liveBackends);
- wrappedStrategy.offerBackends(liveBackends, onBackendsChosen);
- }
-
- @Override
- public S nextBackend() throws ResourceExhaustedException {
- return wrappedStrategy.nextBackend();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
deleted file mode 100644
index 607d327..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-
-import com.twitter.common.util.BackoffDecider;
-
-/**
- * A load balancing strategy that extends the functionality of the mark dead strategy by
- * integrating a hostChecker that allows hosts to transition out of a dead state
- * if the most recent connection to the host was successful.
- *
- * @param <S> typically socket address of a backend host.
- * @author Krishna Gade
- */
-public class MarkDeadStrategyWithHostCheck<S> extends MarkDeadStrategy<S> {
-
- /**
- * LiveHostChecker implements Filter to determine whether a host is alive based on the
- * result of the most recent connection attempt to that host. It keeps a map of
- * backend -> last connection result, which gets updated every time someone tries to
- * add to connection result.
- */
- protected static class LiveHostChecker<S> implements Predicate<S> {
- private final Map<S, ConnectionResult> lastConnectionResult = Maps.newHashMap();
-
- /**
- * Adds the connection result of this backend to the last connection result map.
- *
- * @param backend typically the socket address of the backend.
- * @param result result of what happened when the client tried to connect to this backend.
- */
- public void addConnectResult(S backend, ConnectionResult result) {
- lastConnectionResult.put(backend, result);
- }
-
- /**
- * Checks if the last connection result for this backend and returns {@code true} if it
- * was {@link LoadBalancingStrategy.ConnectionResult#SUCCESS} otherwise returns {@code false}.
- *
- * @param backend typically the socket address of the backend.
- */
- @Override public boolean apply(S backend) {
- ConnectionResult result = lastConnectionResult.get(backend);
- return result != null && result == ConnectionResult.SUCCESS;
- }
- }
-
- // Reference to the host checker we pass to the super class.
- // We keep it here to avoid casting on every access to it.
- protected final LiveHostChecker<S> liveHostChecker;
-
- /**
- * Creates a mark dead strategy with the given wrapped strategy and backoff decider factory.
- * It uses a hostChecker {@link Predicate} that allows hosts to transition out
- * of a dead state if the most recent connection to the host was successful.
- *
- * @param wrappedStrategy one of the implementations of the load balancing strategy.
- * @param backoffFactory backoff decider factory per host.
- */
- public MarkDeadStrategyWithHostCheck(LoadBalancingStrategy<S> wrappedStrategy,
- Function<S, BackoffDecider> backoffFactory) {
- super(wrappedStrategy, backoffFactory, new LiveHostChecker<S>());
- // Casting to LiveHostChecker is safe here as that's the only predicate that we pass to super.
- this.liveHostChecker = ((LiveHostChecker<S>) hostChecker);
- }
-
-
- /**
- * Overrides the base class implementation by adding this connection result to the
- * host checker.
- *
- * @param backendKey typically the socket address of the backend.
- * @param result result of what happened when the client tried to connect to this backend.
- * @param connectTimeNanos time took to connect to the backend in nano seconds.
- */
- @Override
- public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) {
- liveHostChecker.addConnectResult(backendKey, result);
- super.addConnectResult(backendKey, result, connectTimeNanos);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java
deleted file mode 100644
index b634d95..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RandomStrategy.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * A load balancer that selects a random backend each time a request is made..
- *
- * @author William Farner
- */
-public class RandomStrategy<S> extends StaticLoadBalancingStrategy<S> {
-
- private List<S> targets = Lists.newArrayList();
- private final Random random;
-
- public RandomStrategy() {
- this(new Random());
- }
-
- @VisibleForTesting
- RandomStrategy(Random random) {
- this.random = Preconditions.checkNotNull(random);
- }
-
- @Override
- protected Collection<S> onBackendsOffered(Set<S> targets) {
- this.targets = ImmutableList.copyOf(targets);
- return this.targets;
- }
-
- @Override
- public S nextBackend() throws ResourceExhaustedException {
- if (targets.isEmpty()) throw new ResourceExhaustedException("No backends.");
- return targets.get(random.nextInt(targets.size()));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java
deleted file mode 100644
index 6450222..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RequestTracker.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-/**
- * Tracks requests made to a backend service.
- *
- * @author William Farner
- */
-public interface RequestTracker<T> {
-
- /**
- * Informs the tracker of a completed request.
- *
- * @param key Key to identify the owner of the request.
- * @param result Result of the request.
- * @param requestTimeNanos Time duration spent waiting for the request to complete.
- */
- void requestResult(T key, RequestResult result, long requestTimeNanos);
-
- enum RequestResult {
- FAILED,
- TIMEOUT,
- SUCCESS
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java
deleted file mode 100644
index e656ad2..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/RoundRobinStrategy.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A load balancer that distributes load by randomizing the list of available backends, and then
- * rotating through them evenly.
- *
- * @author William Farner
- */
-public class RoundRobinStrategy<S> extends StaticLoadBalancingStrategy<S> {
-
- private Iterator<S> iterator = Iterators.emptyIterator();
-
- @Override
- protected Collection<S> onBackendsOffered(Set<S> targets) {
- List<S> newTargets = Lists.newArrayList(targets);
- Collections.shuffle(newTargets);
- iterator = Iterators.cycle(newTargets);
- return newTargets;
- }
-
- @Override
- public S nextBackend() throws ResourceExhaustedException {
- if (!iterator.hasNext()) throw new ResourceExhaustedException("No backends available!");
- return iterator.next();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java
deleted file mode 100644
index 483e799..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/StaticLoadBalancingStrategy.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * A baseclass for LoadBalancingStrategies that use a static set of backends they are
- * {@link #offerBackends(java.util.Set, com.twitter.common.base.Closure) offered}. Also acts as an
- * adapter, providing no-op implementations of all other LoadBalancingStrategy methods that only
- * need be overridden as required by subclass features.
- *
- * @author John Sirois
- */
-abstract class StaticLoadBalancingStrategy<K> implements LoadBalancingStrategy<K> {
-
- @Override
- public final void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) {
- onBackendsChosen.execute(onBackendsOffered(offeredBackends));
- }
-
- /**
- * Subclasses must override and return a collection of the backends actually chosen for use until
- * the next offer round.
- *
- * @param offeredBackends The backends offered in a {@link
- * #offerBackends(java.util.Set, com.twitter.common.base.Closure)} event.
- * @return The collection of backends that will be used until the next offer event.
- */
- protected abstract Collection<K> onBackendsOffered(Set<K> offeredBackends);
-
- @Override
- public void addConnectResult(K backendKey, ConnectionResult result, long connectTimeNanos) {
- // No-op.
- }
-
- @Override
- public void connectionReturned(K backendKey) {
- // No-op.
- }
-
- @Override
- public void addRequestResult(K requestKey, RequestResult result, long requestTimeNanos) {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java
deleted file mode 100644
index 104729b..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/SubsetStrategy.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A load balancer that maintains a fixed upper bound on the number of backends that will be made
- * available for a wrapped load balancer.
- *
- * TODO(William Farner): May want to consider periodically swapping subsets.
- *
- * TODO(William Farner): May want to catch ResourceExhaustedExceptions from wrapped strategy and adjust
- * subset if possible.
- *
- * @author William Farner
- */
-public class SubsetStrategy<S> implements LoadBalancingStrategy<S> {
- private final LoadBalancingStrategy<S> wrapped;
- private final int maxBackends;
-
- private Set<S> backendSubset = Sets.newHashSet();
-
- public SubsetStrategy(int maxBackends, LoadBalancingStrategy<S> wrapped) {
- Preconditions.checkArgument(maxBackends > 0);
- this.maxBackends = maxBackends;
- this.wrapped = Preconditions.checkNotNull(wrapped);
- }
-
- @Override
- public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) {
- List<S> allTargets = Lists.newArrayList(offeredBackends);
- Collections.shuffle(allTargets);
- backendSubset = ImmutableSet.copyOf(
- allTargets.subList(0, Math.min(maxBackends, allTargets.size())));
- wrapped.offerBackends(backendSubset, onBackendsChosen);
- }
-
- @Override
- public void addConnectResult(S backendKey, ConnectionResult result,
- long connectTimeNanos) {
- if (backendSubset.contains(backendKey)) {
- wrapped.addConnectResult(backendKey, result, connectTimeNanos);
- }
- }
-
- @Override
- public void connectionReturned(S backendKey) {
- if (backendSubset.contains(backendKey)) {
- wrapped.connectionReturned(backendKey);
- }
- }
-
- @Override
- public void addRequestResult(S requestKey, RequestResult result, long requestTimeNanos) {
- Preconditions.checkNotNull(requestKey);
-
- if (backendSubset.contains(requestKey)) {
- wrapped.addRequestResult(requestKey, result, requestTimeNanos);
- }
- }
-
- @Override
- public S nextBackend() throws ResourceExhaustedException {
- return wrapped.nextBackend();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java b/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java
deleted file mode 100644
index e3bf25b..0000000
--- a/commons/src/main/java/com/twitter/common/net/loadbalancing/TrafficMonitorAdapter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.loadbalancing;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.base.Closure;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.net.monitoring.TrafficMonitor;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * @author William Farner
- */
-public class TrafficMonitorAdapter<K> implements LoadBalancingStrategy<K> {
- private final LoadBalancingStrategy<K> strategy;
- private final TrafficMonitor<K> monitor;
-
- public TrafficMonitorAdapter(LoadBalancingStrategy<K> strategy, TrafficMonitor<K> monitor) {
- this.strategy = Preconditions.checkNotNull(strategy);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public static <K> TrafficMonitorAdapter<K> create(LoadBalancingStrategy<K> strategy,
- TrafficMonitor<K> monitor) {
- return new TrafficMonitorAdapter<K>(strategy, monitor);
- }
-
- @Override
- public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) {
- strategy.offerBackends(offeredBackends, onBackendsChosen);
- }
-
- @Override
- public K nextBackend() throws ResourceExhaustedException {
- return strategy.nextBackend();
- }
-
- @Override
- public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos) {
- strategy.addConnectResult(key, result, connectTimeNanos);
- if (result == ConnectionResult.SUCCESS) monitor.connected(key);
- }
-
- @Override
- public void connectionReturned(K key) {
- strategy.connectionReturned(key);
- monitor.released(key);
- }
-
- @Override
- public void addRequestResult(K key, RequestTracker.RequestResult result, long requestTimeNanos) {
- strategy.addRequestResult(key, result, requestTimeNanos);
- monitor.requestResult(key, result, requestTimeNanos);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java b/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java
deleted file mode 100644
index cd881bf..0000000
--- a/commons/src/main/java/com/twitter/common/net/monitoring/ConnectionMonitor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.monitoring;
-
-/**
- * Monitors active connections between two hosts..
- *
- * @author William Farner
- */
-public interface ConnectionMonitor<K> {
-
- /**
- * Instructs the monitor that a connection was established.
- *
- * @param connectionKey Key for the host that a connection was established with.
- */
- public void connected(K connectionKey);
-
- /**
- * Informs the monitor that a connection was released.
- *
- * @param connectionKey Key for the host that a connection was released for.
- */
- public void released(K connectionKey);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java b/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java
deleted file mode 100644
index fd5b577..0000000
--- a/commons/src/main/java/com/twitter/common/net/monitoring/TrafficMonitor.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.monitoring;
-
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.net.loadbalancing.RequestTracker;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-/**
- * Monitors activity on established connections between two hosts. This can be used for a server
- * to track inbound clients, or for a client to track requests sent to different servers.
- *
- * The monitor will retain information for hosts that may no longer be active, but will expunge
- * information for hosts that have been idle for more than five minutes.
- *
- * @author William Farner
- */
-public class TrafficMonitor<K> implements ConnectionMonitor<K>, RequestTracker<K> {
-
- @VisibleForTesting
- static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, Time.MINUTES);
-
- @GuardedBy("this")
- private final LoadingCache<K, TrafficInfo> trafficInfos;
-
- private final String serviceName;
- private final Amount<Long, Time> gcInterval;
-
- private AtomicLong lifetimeRequests = new AtomicLong();
- private final Clock clock;
- private final ScheduledExecutorService gcExecutor;
-
- /**
- * Creates a new traffic monitor using the default cleanup interval.
- *
- * @param serviceName Name of the service to monitor, used for creating variable names.
- */
- public TrafficMonitor(final String serviceName) {
- this(serviceName, DEFAULT_GC_INTERVAL);
- }
-
- /**
- * Creates a new traffic monitor with a custom cleanup interval.
- *
- * @param serviceName Service name for the monitor.
- * @param gcInterval Interval on which the remote host garbage collector should run.
- */
- public TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval) {
- this(serviceName, gcInterval, Clock.SYSTEM_CLOCK);
- }
-
- /**
- * Convenience method to create a typed traffic monitor.
- *
- * @param serviceName Service name for the monitor.
- * @param <T> Monitor type.
- * @return A new traffic monitor.
- */
- public static <T> TrafficMonitor<T> create(String serviceName) {
- return new TrafficMonitor<T>(serviceName);
- }
-
- @VisibleForTesting
- TrafficMonitor(final String serviceName, Clock clock) {
- this(serviceName, DEFAULT_GC_INTERVAL, clock);
- }
-
- private TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval, Clock clock) {
- this.serviceName = MorePreconditions.checkNotBlank(serviceName);
- this.clock = Preconditions.checkNotNull(clock);
- Preconditions.checkNotNull(gcInterval);
- Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must be > zero.");
- this.gcInterval = gcInterval;
-
- trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, TrafficInfo>() {
- @Override public TrafficInfo load(K key) {
- return new TrafficInfo(key);
- }
- });
-
- Runnable gc = new Runnable() {
- @Override public void run() { gc(); }
- };
-
- gcExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("TrafficMonitor-gc-%d").build());
- gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), gcInterval.as(Time.SECONDS),
- TimeUnit.SECONDS);
- }
-
- /**
- * Gets the name of the service that this monitor is monitoring.
- *
- * @return Monitor's service name.
- */
- public String getServiceName() {
- return serviceName;
- }
-
- /**
- * Gets the total number of requests that this monitor has observed, for all remote hosts.
- *
- * @return Total number of requests observed.
- */
- public long getLifetimeRequestCount() {
- return lifetimeRequests.get();
- }
-
- /**
- * Fetches all current traffic information.
- *
- * @return A map from the host key type to information about that host.
- */
- public synchronized Map<K, TrafficInfo> getTrafficInfo() {
- return ImmutableMap.copyOf(trafficInfos.asMap());
- }
-
- @Override
- public synchronized void connected(K key) {
- Preconditions.checkNotNull(key);
-
- trafficInfos.getUnchecked(key).incConnections();
- }
-
- @Override
- public synchronized void released(K key) {
- Preconditions.checkNotNull(key);
-
- TrafficInfo info = trafficInfos.getUnchecked(key);
-
- Preconditions.checkState(info.getConnectionCount() > 0, "Double release detected!");
- info.decConnections();
- }
-
- @Override
- public void requestResult(K key, RequestResult result, long requestTimeNanos) {
- Preconditions.checkNotNull(key);
-
- lifetimeRequests.incrementAndGet();
- trafficInfos.getUnchecked(key).addResult(result);
- }
-
- @VisibleForTesting
- synchronized void gc() {
- Iterables.removeIf(trafficInfos.asMap().entrySet(),
- new Predicate<Map.Entry<K, TrafficInfo>>() {
- @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) {
- if (clientInfo.getValue().connections.get() > 0) return false;
-
- long idlePeriod = clock.nowNanos() - clientInfo.getValue().getLastActiveTimestamp();
-
- return idlePeriod > gcInterval.as(Time.NANOSECONDS);
- }
- });
- }
-
- /**
- * Shuts down TrafficMonitor by stopping background gc task.
- */
- public void shutdown() {
- new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, Time.SECONDS)).execute();
- }
-
- /**
- * Information about traffic obsserved to/from a specific host.
- */
- public class TrafficInfo {
- private final K key;
- private AtomicInteger requestSuccesses = new AtomicInteger();
- private AtomicInteger requestFailures = new AtomicInteger();
- private AtomicInteger connections = new AtomicInteger();
- private AtomicLong lastActive = new AtomicLong();
-
- TrafficInfo(K key) {
- this.key = key;
- pulse();
- }
-
- void pulse() {
- lastActive.set(clock.nowNanos());
- }
-
- public K getKey() {
- return key;
- }
-
- void addResult(RequestResult result) {
- pulse();
- switch (result) {
- case SUCCESS:
- requestSuccesses.incrementAndGet();
- break;
- case FAILED:
- case TIMEOUT:
- requestFailures.incrementAndGet();
- break;
- }
- }
-
- public int getRequestSuccessCount() {
- return requestSuccesses.get();
- }
-
- public int getRequestFailureCount() {
- return requestFailures.get();
- }
-
- int incConnections() {
- pulse();
- return connections.incrementAndGet();
- }
-
- int decConnections() {
- pulse();
- return connections.decrementAndGet();
- }
-
- public int getConnectionCount() {
- return connections.get();
- }
-
- public long getLastActiveTimestamp() {
- return lastActive.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/Connection.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/Connection.java b/commons/src/main/java/com/twitter/common/net/pool/Connection.java
deleted file mode 100644
index cf8f1a4..0000000
--- a/commons/src/main/java/com/twitter/common/net/pool/Connection.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.pool;
-
-import com.google.common.base.Supplier;
-
-import java.io.Closeable;
-
-/**
- * An interface to a connection resource that may become invalid.
- *
- * @author John Sirois
- */
-public interface Connection<T, E> extends Supplier<T>, Closeable {
-
- /**
- * This will always be the same underlying connection for the lifetime of this object.
- *
- * @return the connection
- */
- @Override T get();
-
- /**
- * @return {@code true} if the supplied connection is valid for use.
- */
- boolean isValid();
-
- /**
- * Closes this connection.
- */
- void close();
-
- /**
- * @return the endpoint this connection is connected to.
- */
- E getEndpoint();
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java
deleted file mode 100644
index 7b87bc7..0000000
--- a/commons/src/main/java/com/twitter/common/net/pool/ConnectionFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.pool;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * A factory for connections that also dictates policy for the size of the connection population.
- *
- * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null protocol is already tangling
- * implementation code
- *
- * @author John Sirois
- */
-public interface ConnectionFactory<S extends Connection<?, ?>> {
-
- /**
- * Checks whether this factory might create a connection if requested.
- *
- * @return {@code} true if this factory might create a connection at this point in time; ie
- * a call to {@link #create} might not have returned {@code null}. May return true to multiple
- * threads if concurrently creating connections.
- */
- boolean mightCreate();
-
- /**
- * Attempts to create a new connection within the given timeout and subject to this factory's
- * connection population size policy.
- *
- * @param timeout the maximum amount of time to wait
- * @return a new connection or null if there are too many connections already
- * @throws Exception if there was a problem creating the connection or establishing the connection
- * takes too long
- */
- S create(Amount<Long, Time> timeout) throws Exception;
-
- /**
- * Destroys a connection. It is an error to attempt to destroy a connection this factory did
- * not {@link #create}
- *
- * @param connection The connection to destroy.
- */
- void destroy(S connection);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
deleted file mode 100644
index 81d7684..0000000
--- a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.pool;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and
- * connection choice to a supplied strategy.
- *
- * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in
- * use.
- *
- * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
- *
- * @author John Sirois
- */
-public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> {
-
- private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName());
-
- private final Set<S> leasedConnections =
- Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap());
- private final Set<S> availableConnections = Sets.newHashSet();
- private final Lock poolLock;
- private final Condition available;
-
- private final ConnectionFactory<S> connectionFactory;
- private final Executor executor;
-
- private volatile boolean closed;
- private final AtomicLong connectionsCreated;
- private final AtomicLong connectionsDestroyed;
- private final AtomicLong connectionsReturned;
-
- /**
- * Creates a connection pool with a connection picker that selects the first item in the set of
- * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}.
- *
- * @param connectionFactory Factory to create and destroy connections.
- */
- public ConnectionPool(ConnectionFactory<S> connectionFactory) {
- this(connectionFactory, Stats.STATS_PROVIDER);
- }
-
- /**
- * Creates a connection pool with a connection picker that selects the first item in the set of
- * available connections and uses the supplied StatsProvider to register stats with.
- *
- * @param connectionFactory Factory to create and destroy connections.
- * @param statsProvider Stats export provider.
- */
- public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) {
- this(Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("CP-" + connectionFactory + "[%d]")
- .setDaemon(true)
- .build()),
- new ReentrantLock(true), connectionFactory, statsProvider);
- }
-
- @VisibleForTesting
- ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory,
- StatsProvider statsProvider) {
- Preconditions.checkNotNull(executor);
- Preconditions.checkNotNull(poolLock);
- Preconditions.checkNotNull(connectionFactory);
- Preconditions.checkNotNull(statsProvider);
-
- this.executor = executor;
- this.poolLock = poolLock;
- available = poolLock.newCondition();
- this.connectionFactory = connectionFactory;
-
- String cfName = Stats.normalizeName(connectionFactory.toString());
- statsProvider.makeGauge("cp_leased_connections_" + cfName,
- new Supplier<Integer>() {
- @Override public Integer get() {
- return leasedConnections.size();
- }
- });
- statsProvider.makeGauge("cp_available_connections_" + cfName,
- new Supplier<Integer>() {
- @Override public Integer get() {
- return availableConnections.size();
- }
- });
- this.connectionsCreated =
- statsProvider.makeCounter("cp_created_connections_" + cfName);
- this.connectionsDestroyed =
- statsProvider.makeCounter("cp_destroyed_connections_" + cfName);
- this.connectionsReturned =
- statsProvider.makeCounter("cp_returned_connections_" + cfName);
- }
-
- @Override
- public String toString() {
- return "CP-" + connectionFactory;
- }
-
- @Override
- public S get() throws ResourceExhaustedException, TimeoutException {
- checkNotClosed();
- poolLock.lock();
- try {
- return leaseConnection(NO_TIMEOUT);
- } finally {
- poolLock.unlock();
- }
- }
-
- @Override
- public S get(Amount<Long, Time> timeout)
- throws ResourceExhaustedException, TimeoutException {
-
- checkNotClosed();
- Preconditions.checkNotNull(timeout);
- if (timeout.getValue() == 0) {
- return get();
- }
-
- try {
- long start = System.nanoTime();
- long timeBudgetNs = timeout.as(Time.NANOSECONDS);
- if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) {
- try {
- timeBudgetNs -= (System.nanoTime() - start);
- return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS));
- } finally {
- poolLock.unlock();
- }
- } else {
- throw new TimeoutException("Timed out waiting for pool lock");
- }
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted waiting for pool lock");
- }
- }
-
- private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException,
- TimeoutException {
- S connection = getConnection(timeout);
- if (connection == null) {
- throw new ResourceExhaustedException("Connection pool resources exhausted");
- }
- return leaseConnection(connection);
- }
-
- @Override
- public void release(S connection) {
- release(connection, false);
- }
-
- /**
- * Equivalent to releasing a Connection with isValid() == false.
- * @see ObjectPool#remove(Object)
- */
- @Override
- public void remove(S connection) {
- release(connection, true);
- }
-
- // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create
- // connection - reason about this and potentially submit release to our executor
- private void release(S connection, boolean remove) {
- poolLock.lock();
- try {
- if (!leasedConnections.remove(connection)) {
- throw new IllegalArgumentException("Connection not controlled by this connection pool: "
- + connection);
- }
-
- if (!closed && !remove && connection.isValid()) {
- addConnection(connection);
- connectionsReturned.incrementAndGet();
- } else {
- connectionFactory.destroy(connection);
- connectionsDestroyed.incrementAndGet();
- }
- } finally {
- poolLock.unlock();
- }
- }
-
- @Override
- public void close() {
- poolLock.lock();
- try {
- for (S availableConnection : availableConnections) {
- connectionFactory.destroy(availableConnection);
- }
- } finally {
- closed = true;
- poolLock.unlock();
- }
- }
-
- private void checkNotClosed() {
- Preconditions.checkState(!closed);
- }
-
- private S leaseConnection(S connection) {
- leasedConnections.add(connection);
- return connection;
- }
-
- // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be
- // fixed but there may be no need - do gedankanalysis
- private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException,
- TimeoutException {
- if (availableConnections.isEmpty()) {
- if (leasedConnections.isEmpty()) {
- // Completely empty pool
- try {
- return createConnection(timeout);
- } catch (Exception e) {
- throw new ResourceExhaustedException("failed to create a new connection", e);
- }
- } else {
- // If the pool is allowed to grow - let the connection factory race a release
- if (connectionFactory.mightCreate()) {
- executor.execute(new Runnable() {
- @Override public void run() {
- try {
- // The connection timeout is not needed here to honor the callers get requested
- // timeout, but we don't want to have an infinite timeout which could exhaust a
- // thread pool over many backgrounded create calls
- S connection = createConnection(timeout);
- if (connection != null) {
- addConnection(connection);
- } else {
- LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " +
- "due to maximum pool size or timeout");
- }
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e);
- }
- }
- });
- }
-
- try {
- // We wait for a returned/new connection here in loops to guard against the
- // "spurious wakeups" that are documented can occur with Condition.await()
- if (timeout.getValue() == 0) {
- while(availableConnections.isEmpty()) {
- available.await();
- }
- } else {
- long timeRemainingNs = timeout.as(Time.NANOSECONDS);
- while(availableConnections.isEmpty()) {
- long start = System.nanoTime();
- if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) {
- throw new TimeoutException(
- "timeout waiting for a connection to be released to the pool");
- } else {
- timeRemainingNs -= (System.nanoTime() - start);
- }
- }
- if (availableConnections.isEmpty()) {
- throw new TimeoutException(
- "timeout waiting for a connection to be released to the pool");
- }
- }
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for a connection.");
- }
- }
- }
-
- return getAvailableConnection();
- }
-
- private S getAvailableConnection() {
- S connection = (availableConnections.size() == 1)
- ? Iterables.getOnlyElement(availableConnections)
- : availableConnections.iterator().next();
- if (!availableConnections.remove(connection)) {
- throw new IllegalArgumentException("Connection picked not in pool: " + connection);
- }
- return connection;
- }
-
- private S createConnection(Amount<Long, Time> timeout) throws Exception {
- S connection = connectionFactory.create(timeout);
- if (connection != null) {
- connectionsCreated.incrementAndGet();
- }
- return connection;
- }
-
- private void addConnection(S connection) {
- poolLock.lock();
- try {
- availableConnections.add(connection);
- available.signal();
- } finally {
- poolLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
deleted file mode 100644
index 6b68513..0000000
--- a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.net.pool;
-
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.common.base.Command;
-
-/**
- * A host set that can be monitored for changes.
- *
- * @param <T> The type that is used to identify members of the host set.
- */
-public interface DynamicHostSet<T> {
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process
- * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @throws MonitorException if there is a problem monitoring the host set
- * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)}
- */
- @Deprecated
- public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException;
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process
- * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @return A command which, when executed, will stop monitoring the host set.
- * @throws MonitorException if there is a problem monitoring the host set
- */
- public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException;
-
- /**
- * An interface to an object that is interested in receiving notification whenever the host set
- * changes.
- */
- public static interface HostChangeMonitor<T> {
-
- /**
- * Called when either the available set of services changes (when a service dies or a new
- * instance comes on-line) or when an existing service advertises a status or health change.
- *
- * @param hostSet the current set of available ServiceInstances
- */
- void onChange(ImmutableSet<T> hostSet);
- }
-
- public static class MonitorException extends Exception {
- public MonitorException(String msg) {
- super(msg);
- }
-
- public MonitorException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-}