You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:20 UTC
[13/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 0000000..c3c5d81
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.exceptions.ChecksumFailedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * Stream admin op.
+ */
+public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
+
+ protected final String stream;
+ protected final StreamManager streamManager;
+ protected final OpStatsLogger opStatsLogger;
+ protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ protected final Long checksum;
+ protected final Feature checksumDisabledFeature;
+
+ protected StreamAdminOp(String stream,
+ StreamManager streamManager,
+ OpStatsLogger statsLogger,
+ Long checksum,
+ Feature checksumDisabledFeature) {
+ this.stream = stream;
+ this.streamManager = streamManager;
+ this.opStatsLogger = statsLogger;
+ // start here in case the operation is failed before executing.
+ stopwatch.reset().start();
+ this.checksum = checksum;
+ this.checksumDisabledFeature = checksumDisabledFeature;
+ }
+
+ protected Long computeChecksum() {
+ return ProtocolUtils.streamOpCRC32(stream);
+ }
+
+ @Override
+ public void preExecute() throws DLException {
+ if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+ Long serverChecksum = computeChecksum();
+ if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+ throw new ChecksumFailedException();
+ }
+ }
+ }
+
+ /**
+ * Execute the operation.
+ *
+ * @return execute operation
+ */
+ protected abstract Future<WriteResponse> executeOp();
+
+ @Override
+ public Future<WriteResponse> execute() {
+ return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() {
+
+ @Override
+ public WriteResponse map(WriteResponse response) {
+ opStatsLogger.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ return response;
+ }
+
+ @Override
+ public WriteResponse handle(Throwable cause) {
+ opStatsLogger.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+ }
+
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
new file mode 100644
index 0000000..5b583e1
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Admin Operations.
+ */
+package org.apache.distributedlog.service.stream.admin;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
new file mode 100644
index 0000000..5db2037
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import java.io.Closeable;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dynamically rebuild a rate limiter when the supplied dynamic config changes.
+ *
+ * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
+ * the config listener.
+ */
+public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
+
+ private final ConfigurationListener listener;
+ private final Feature rateLimitDisabledFeature;
+ volatile RequestLimiter<Req> limiter;
+ final DynamicDistributedLogConfiguration dynConf;
+
+ public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf,
+ StatsLogger statsLogger,
+ Feature rateLimitDisabledFeature) {
+ final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic");
+ this.dynConf = dynConf;
+ this.rateLimitDisabledFeature = rateLimitDisabledFeature;
+ this.listener = new ConfigurationListener() {
+ @Override
+ public void configurationChanged(ConfigurationEvent event) {
+ // Note that this method may be called several times if several config options
+ // are changed. The effect is harmless except that we create and discard more
+ // objects than we need to.
+ LOG.debug("Config changed callback invoked with event {} {} {} {}", new Object[] {
+ event.getPropertyName(), event.getPropertyValue(), event.getType(),
+ event.isBeforeUpdate()});
+ if (!event.isBeforeUpdate()) {
+ limiterStatsLogger.getCounter("config_changed").inc();
+ LOG.debug("Rebuilding limiter");
+ limiter = build();
+ }
+ }
+ };
+ LOG.debug("Registering config changed callback");
+ dynConf.addConfigurationListener(listener);
+ }
+
+ public void initialize() {
+ this.limiter = build();
+ }
+
+ @Override
+ public void apply(Req request) throws OverCapacityException {
+ if (rateLimitDisabledFeature.isAvailable()) {
+ return;
+ }
+ limiter.apply(request);
+ }
+
+ @Override
+ public void close() {
+ boolean success = dynConf.removeConfigurationListener(listener);
+ LOG.debug("Deregistering config changed callback success={}", success);
+ }
+
+ /**
+ * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed.
+ * This may be called multiple times so the method should be cheap.
+ */
+ protected abstract RequestLimiter<Req> build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
new file mode 100644
index 0000000..fc30599
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.GuavaRateLimiter;
+import org.apache.distributedlog.limiter.RateLimiter;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.distributedlog.service.stream.WriteOpWithPayload;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Request limiter builder.
+ */
+public class RequestLimiterBuilder {
+ private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION;
+ private RateLimiter limiter;
+ private CostFunction<StreamOp> costFunction;
+ private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+
+ /**
+ * Function to calculate the `RPS` (Request per second) cost of a given stream operation.
+ */
+ public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() {
+ @Override
+ public int apply(StreamOp op) {
+ if (op instanceof WriteOpWithPayload) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ };
+
+ /**
+ * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation.
+ */
+ public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() {
+ @Override
+ public int apply(StreamOp op) {
+ if (op instanceof WriteOpWithPayload) {
+ WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
+ return (int) Math.min(writeOp.getPayloadSize(), Integer.MAX_VALUE);
+ } else {
+ return 0;
+ }
+ }
+ };
+
+ /**
+ * Function to check if a stream operation will cause {@link OverCapacityException}.
+ */
+ public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() {
+ @Override
+ public void apply(StreamOp op) throws OverCapacityException {
+ return;
+ }
+ };
+
+ public RequestLimiterBuilder limit(int limit) {
+ this.limiter = GuavaRateLimiter.of(limit);
+ return this;
+ }
+
+ public RequestLimiterBuilder overlimit(OverlimitFunction<StreamOp> overlimitFunction) {
+ this.overlimitFunction = overlimitFunction;
+ return this;
+ }
+
+ public RequestLimiterBuilder cost(CostFunction<StreamOp> costFunction) {
+ this.costFunction = costFunction;
+ return this;
+ }
+
+ public RequestLimiterBuilder statsLogger(StatsLogger statsLogger) {
+ this.statsLogger = statsLogger;
+ return this;
+ }
+
+ public static RequestLimiterBuilder newRpsLimiterBuilder() {
+ return new RequestLimiterBuilder().cost(RPS_COST_FUNCTION);
+ }
+
+ public static RequestLimiterBuilder newBpsLimiterBuilder() {
+ return new RequestLimiterBuilder().cost(BPS_COST_FUNCTION);
+ }
+
+ public RequestLimiter<StreamOp> build() {
+ checkNotNull(limiter);
+ checkNotNull(overlimitFunction);
+ checkNotNull(costFunction);
+ return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
new file mode 100644
index 0000000..de805aa
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ChainedRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Request limiter for the service instance (global request limiter).
+ */
+public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> {
+ private final StatsLogger limiterStatLogger;
+ private final MovingAverageRate serviceRps;
+ private final MovingAverageRate serviceBps;
+ private final StreamManager streamManager;
+
+ public ServiceRequestLimiter(DynamicDistributedLogConfiguration dynConf,
+ StatsLogger statsLogger,
+ MovingAverageRate serviceRps,
+ MovingAverageRate serviceBps,
+ StreamManager streamManager,
+ Feature disabledFeature) {
+ super(dynConf, statsLogger, disabledFeature);
+ this.limiterStatLogger = statsLogger;
+ this.streamManager = streamManager;
+ this.serviceRps = serviceRps;
+ this.serviceBps = serviceBps;
+ this.limiter = build();
+ }
+
+ @Override
+ public RequestLimiter<StreamOp> build() {
+ int rpsStreamAcquireLimit = dynConf.getRpsStreamAcquireServiceLimit();
+ int rpsSoftServiceLimit = dynConf.getRpsSoftServiceLimit();
+ int rpsHardServiceLimit = dynConf.getRpsHardServiceLimit();
+ int bpsStreamAcquireLimit = dynConf.getBpsStreamAcquireServiceLimit();
+ int bpsSoftServiceLimit = dynConf.getBpsSoftServiceLimit();
+ int bpsHardServiceLimit = dynConf.getBpsHardServiceLimit();
+
+ RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
+ .limit(rpsHardServiceLimit)
+ .overlimit(new OverlimitFunction<StreamOp>() {
+ @Override
+ public void apply(StreamOp request) throws OverCapacityException {
+ throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance");
+ }
+ });
+
+ RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
+ .limit(rpsSoftServiceLimit);
+
+ RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
+ .limit(bpsHardServiceLimit)
+ .overlimit(new OverlimitFunction<StreamOp>() {
+ @Override
+ public void apply(StreamOp request) throws OverCapacityException {
+ throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance");
+ }
+ });
+
+ RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
+ .limit(bpsSoftServiceLimit);
+
+ ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
+ builder.addLimiter(new StreamAcquireLimiter(
+ streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
+ builder.addLimiter(new StreamAcquireLimiter(
+ streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
+ builder.addLimiter(bpsHardLimiterBuilder.build());
+ builder.addLimiter(bpsSoftLimiterBuilder.build());
+ builder.addLimiter(rpsHardLimiterBuilder.build());
+ builder.addLimiter(rpsSoftLimiterBuilder.build());
+ builder.statsLogger(limiterStatLogger);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
new file mode 100644
index 0000000..7675d6f
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.exceptions.TooManyStreamsException;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A special limiter on limiting acquiring new streams.
+ */
+public class StreamAcquireLimiter implements RequestLimiter<StreamOp> {
+ private final StreamManager streamManager;
+ private final MovingAverageRate serviceRps;
+ private final double serviceRpsLimit;
+ private final Counter overlimitCounter;
+
+ public StreamAcquireLimiter(StreamManager streamManager,
+ MovingAverageRate serviceRps,
+ double serviceRpsLimit,
+ StatsLogger statsLogger) {
+ this.streamManager = streamManager;
+ this.serviceRps = serviceRps;
+ this.serviceRpsLimit = serviceRpsLimit;
+ this.overlimitCounter = statsLogger.getCounter("overlimit");
+ }
+
+ @Override
+ public void apply(StreamOp op) throws OverCapacityException {
+ String streamName = op.streamName();
+ if (serviceRpsLimit > -1 && serviceRps.get() > serviceRpsLimit && !streamManager.isAcquired(streamName)) {
+ overlimitCounter.inc();
+ throw new TooManyStreamsException("Request rate is too high to accept new stream " + streamName + ".");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
new file mode 100644
index 0000000..42b4e1e
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ChainedRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A dynamic request limiter on limiting stream operations.
+ */
+public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> {
+ private final DynamicDistributedLogConfiguration dynConf;
+ private final StatsLogger limiterStatLogger;
+ private final String streamName;
+
+ public StreamRequestLimiter(String streamName,
+ DynamicDistributedLogConfiguration dynConf,
+ StatsLogger statsLogger,
+ Feature disabledFeature) {
+ super(dynConf, statsLogger, disabledFeature);
+ this.limiterStatLogger = statsLogger;
+ this.dynConf = dynConf;
+ this.streamName = streamName;
+ this.limiter = build();
+ }
+
+ @Override
+ public RequestLimiter<StreamOp> build() {
+
+ // RPS hard, soft limits
+ RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
+ .limit(dynConf.getRpsHardWriteLimit())
+ .overlimit(new OverlimitFunction<StreamOp>() {
+ @Override
+ public void apply(StreamOp op) throws OverCapacityException {
+ throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName);
+ }
+ });
+ RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
+ .limit(dynConf.getRpsSoftWriteLimit());
+
+ // BPS hard, soft limits
+ RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
+ .limit(dynConf.getBpsHardWriteLimit())
+ .overlimit(new OverlimitFunction<StreamOp>() {
+ @Override
+ public void apply(StreamOp op) throws OverCapacityException {
+ throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName);
+ }
+ });
+ RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+ .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
+ .limit(dynConf.getBpsSoftWriteLimit());
+
+ ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
+ builder.addLimiter(rpsSoftLimiterBuilder.build());
+ builder.addLimiter(rpsHardLimiterBuilder.build());
+ builder.addLimiter(bpsSoftLimiterBuilder.build());
+ builder.addLimiter(bpsHardLimiterBuilder.build());
+ builder.statsLogger(limiterStatLogger);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
new file mode 100644
index 0000000..c666b08
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Request Rate Limiting.
+ */
+package org.apache.distributedlog.service.stream.limiter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
new file mode 100644
index 0000000..7429a85
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Operations.
+ */
+package org.apache.distributedlog.service.stream;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
new file mode 100644
index 0000000..72668c2
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A stream-to-partition converter that caches the mapping between stream and partitions.
+ */
+public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter {
+
+ private final ConcurrentMap<String, Partition> partitions;
+
+ protected CacheableStreamPartitionConverter() {
+ this.partitions = new ConcurrentHashMap<String, Partition>();
+ }
+
+ @Override
+ public Partition convert(String streamName) {
+ Partition p = partitions.get(streamName);
+ if (null != p) {
+ return p;
+ }
+ // not found
+ Partition newPartition = newPartition(streamName);
+ Partition oldPartition = partitions.putIfAbsent(streamName, newPartition);
+ if (null == oldPartition) {
+ return newPartition;
+ } else {
+ return oldPartition;
+ }
+ }
+
+ /**
+ * Create the partition from <code>streamName</code>.
+ *
+ * @param streamName
+ * stream name
+ * @return partition id of the stream
+ */
+ protected abstract Partition newPartition(String streamName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
new file mode 100644
index 0000000..30b2896
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter.
+ */
+public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter {
+
+ private final String delimiter;
+
+ public DelimiterStreamPartitionConverter() {
+ this("_");
+ }
+
+ public DelimiterStreamPartitionConverter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ protected Partition newPartition(String streamName) {
+ String[] parts = StringUtils.split(streamName, delimiter);
+ if (null != parts && parts.length == 2) {
+ try {
+ int partition = Integer.parseInt(parts[1]);
+ return new Partition(parts[0], partition);
+ } catch (NumberFormatException nfe) {
+ // ignore the exception
+ }
+ }
+ return new Partition(streamName, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
new file mode 100644
index 0000000..5be172f
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+/**
+ * Map stream name to partition of the same name.
+ */
+public class IdentityStreamPartitionConverter extends CacheableStreamPartitionConverter {
+ @Override
+ protected Partition newPartition(String streamName) {
+ return new Partition(streamName, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
new file mode 100644
index 0000000..aa69276
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import com.google.common.base.Objects;
+
+/**
+ * `Partition` defines the relationship between a `virtual` stream and a
+ * physical DL stream.
+ *
+ * <p>A `virtual` stream could be partitioned into multiple partitions
+ * and each partition is effectively a DL stream.
+ */
+public class Partition {
+
+ // Name of its parent stream.
+ private final String stream;
+
+ // Unique id of the partition within the stream.
+ // It can be just simply an index id.
+ public final int id;
+
+ public Partition(String stream, int id) {
+ this.stream = stream;
+ this.id = id;
+ }
+
+ /**
+ * Get the `virtual` stream name.
+ *
+ * @return the stream name.
+ */
+ public String getStream() {
+ return stream;
+ }
+
+ /**
+ * Get the partition id of this partition.
+ *
+ * @return partition id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Get the 6 digit 0 padded id of this partition as a String.
+ * @return partition id
+ */
+ public String getPaddedId() {
+ return String.format("%06d", getId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Partition)) {
+ return false;
+ }
+ Partition partition = (Partition) o;
+
+ return id == partition.id && Objects.equal(stream, partition.stream);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = stream.hashCode();
+ result = 31 * result + id;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Partition(")
+ .append(stream)
+ .append(", ")
+ .append(id)
+ .append(")");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
new file mode 100644
index 0000000..bfcc5db
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A mapping between a logical stream and a set of physical partitions.
+ */
+public class PartitionMap {
+
+ private final Map<String, Set<Partition>> partitionMap;
+
+ public PartitionMap() {
+ partitionMap = new HashMap<String, Set<Partition>>();
+ }
+
+ public synchronized boolean addPartition(Partition partition, int maxPartitions) {
+ if (maxPartitions <= 0) {
+ return true;
+ }
+ Set<Partition> partitions = partitionMap.get(partition.getStream());
+ if (null == partitions) {
+ partitions = new HashSet<Partition>();
+ partitions.add(partition);
+ partitionMap.put(partition.getStream(), partitions);
+ return true;
+ }
+ if (partitions.contains(partition) || partitions.size() < maxPartitions) {
+ partitions.add(partition);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized boolean removePartition(Partition partition) {
+ Set<Partition> partitions = partitionMap.get(partition.getStream());
+ return null != partitions && partitions.remove(partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
new file mode 100644
index 0000000..3ea1337
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+/**
+ * Map stream name to a partition.
+ *
+ * @see Partition
+ */
+public interface StreamPartitionConverter {
+
+ /**
+ * Convert the stream name to partition.
+ *
+ * @param streamName
+ * stream name
+ * @return partition
+ */
+ Partition convert(String streamName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
new file mode 100644
index 0000000..d185e88
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * StreamSet - A logical set of streams.
+ */
+package org.apache.distributedlog.service.streamset;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
new file mode 100644
index 0000000..3934eb5
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.tools;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.tools.Tool;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tools to interact with proxies.
+ */
+public class ProxyTool extends Tool {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
+
+ /**
+ * Abstract Cluster level command.
+ */
+ protected abstract static class ClusterCommand extends OptsCommand {
+
+ protected Options options = new Options();
+ protected URI uri;
+ protected final List<String> streams = new ArrayList<String>();
+
+ protected ClusterCommand(String name, String description) {
+ super(name, description);
+ options.addOption("u", "uri", true, "DistributedLog URI");
+ options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
+ options.addOption("e", "expression", true, "Expression to generate stream suffix. "
+ + "Currently we support range '0-9', list '1,2,3' and name '143'");
+ }
+
+ @Override
+ protected int runCmd(CommandLine commandLine) throws Exception {
+ try {
+ parseCommandLine(commandLine);
+ } catch (ParseException pe) {
+ System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
+ printUsage();
+ return -1;
+ }
+
+ DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
+ logger.info("Created serverset for {}", uri);
+ try {
+ DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
+ .name("proxy_tool")
+ .clientId(ClientId$.MODULE$.apply("proxy_tool"))
+ .maxRedirects(2)
+ .serverSet(serverSet.getServerSet())
+ .clientBuilder(ClientBuilder.get()
+ .connectionTimeout(Duration.fromSeconds(2))
+ .tcpConnectTimeout(Duration.fromSeconds(2))
+ .requestTimeout(Duration.fromSeconds(10))
+ .hostConnectionLimit(1)
+ .hostConnectionCoresize(1)
+ .keepAlive(true)
+ .failFast(false))
+ .build();
+ try {
+ return runCmd(client);
+ } finally {
+ client.close();
+ }
+ } finally {
+ serverSet.close();
+ }
+ }
+
+ protected abstract int runCmd(DistributedLogClient client) throws Exception;
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ if (!cmdline.hasOption("u")) {
+ throw new ParseException("No distributedlog uri provided.");
+ }
+ this.uri = URI.create(cmdline.getOptionValue("u"));
+
+ // get stream names
+ String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : "";
+ String streamExpression = null;
+ if (cmdline.hasOption("e")) {
+ streamExpression = cmdline.getOptionValue("e");
+ }
+ if (null == streamPrefix || null == streamExpression) {
+ throw new ParseException("Please specify stream prefix & expression.");
+ }
+ // parse the stream expression
+ if (streamExpression.contains("-")) {
+ // a range expression
+ String[] parts = streamExpression.split("-");
+ if (parts.length != 2) {
+ throw new ParseException("Invalid stream index range : " + streamExpression);
+ }
+ try {
+ int start = Integer.parseInt(parts[0]);
+ int end = Integer.parseInt(parts[1]);
+ if (start > end) {
+ throw new ParseException("Invalid stream index range : " + streamExpression);
+ }
+ for (int i = start; i <= end; i++) {
+ streams.add(streamPrefix + i);
+ }
+ } catch (NumberFormatException nfe) {
+ throw new ParseException("Invalid stream index range : " + streamExpression);
+ }
+ } else if (streamExpression.contains(",")) {
+ // a list expression
+ String[] parts = streamExpression.split(",");
+ try {
+ for (String part : parts) {
+ streams.add(streamPrefix + part);
+ }
+ } catch (NumberFormatException nfe) {
+ throw new ParseException("Invalid stream suffix list : " + streamExpression);
+ }
+ } else {
+ streams.add(streamPrefix + streamExpression);
+ }
+ }
+ }
+
+ /**
+ * Command to release ownership of a log stream.
+ */
+ static class ReleaseCommand extends ClusterCommand {
+
+ double rate = 100f;
+
+ ReleaseCommand() {
+ super("release", "Release Stream Ownerships");
+ options.addOption("t", "rate", true, "Rate to release streams");
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (cmdline.hasOption("t")) {
+ rate = Double.parseDouble(cmdline.getOptionValue("t", "100"));
+ }
+ }
+
+ @Override
+ protected int runCmd(DistributedLogClient client) throws Exception {
+ RateLimiter rateLimiter = RateLimiter.create(rate);
+ for (String stream : streams) {
+ rateLimiter.acquire();
+ try {
+ Await.result(client.release(stream));
+ System.out.println("Release ownership of stream " + stream);
+ } catch (Exception e) {
+ System.err.println("Failed to release ownership of stream " + stream);
+ throw e;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ protected String getUsage() {
+ return "release [options]";
+ }
+ }
+
+ /**
+ * Command to truncate a log stream.
+ */
+ static class TruncateCommand extends ClusterCommand {
+
+ DLSN dlsn = DLSN.InitialDLSN;
+
+ TruncateCommand() {
+ super("truncate", "Truncate streams until given dlsn.");
+ options.addOption("d", "dlsn", true, "DLSN to truncate until");
+ }
+
+ @Override
+ protected int runCmd(DistributedLogClient client) throws Exception {
+ System.out.println("Truncating streams : " + streams);
+ for (String stream : streams) {
+ boolean success = Await.result(client.truncate(stream, dlsn));
+ System.out.println("Truncate " + stream + " to " + dlsn + " : " + success);
+ }
+ return 0;
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (!cmdline.hasOption("d")) {
+ throw new ParseException("No DLSN provided");
+ }
+ String[] dlsnStrs = cmdline.getOptionValue("d").split(",");
+ if (dlsnStrs.length != 3) {
+ throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d"));
+ }
+ dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2]));
+ }
+
+ @Override
+ protected String getUsage() {
+ return "truncate [options]";
+ }
+ }
+
+ /**
+ * Abstract command to operate on a single proxy server.
+ */
+ protected abstract static class ProxyCommand extends OptsCommand {
+
+ protected Options options = new Options();
+ protected InetSocketAddress address;
+
+ protected ProxyCommand(String name, String description) {
+ super(name, description);
+ options.addOption("H", "host", true, "Single Proxy Address");
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ if (!cmdline.hasOption("H")) {
+ throw new ParseException("No proxy address provided");
+ }
+ address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H"));
+ }
+
+ @Override
+ protected int runCmd(CommandLine commandLine) throws Exception {
+ try {
+ parseCommandLine(commandLine);
+ } catch (ParseException pe) {
+ System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
+ printUsage();
+ return -1;
+ }
+
+ DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder()
+ .name("proxy_tool")
+ .clientId(ClientId$.MODULE$.apply("proxy_tool"))
+ .maxRedirects(2)
+ .host(address)
+ .clientBuilder(ClientBuilder.get()
+ .connectionTimeout(Duration.fromSeconds(2))
+ .tcpConnectTimeout(Duration.fromSeconds(2))
+ .requestTimeout(Duration.fromSeconds(10))
+ .hostConnectionLimit(1)
+ .hostConnectionCoresize(1)
+ .keepAlive(true)
+ .failFast(false));
+ Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+ ClientUtils.buildClient(clientBuilder);
+ try {
+ return runCmd(clientPair);
+ } finally {
+ clientPair.getLeft().close();
+ }
+ }
+
+ protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
+ }
+
+ /**
+ * Command to enable/disable accepting new streams.
+ */
+ static class AcceptNewStreamCommand extends ProxyCommand {
+
+ boolean enabled = false;
+
+ AcceptNewStreamCommand() {
+ super("accept-new-stream", "Enable/Disable accepting new streams for one proxy");
+ options.addOption("e", "enabled", true, "Enable/Disable accepting new streams");
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (!cmdline.hasOption("e")) {
+ throw new ParseException("No action 'enable/disable' provided");
+ }
+ enabled = Boolean.parseBoolean(cmdline.getOptionValue("e"));
+ }
+
+ @Override
+ protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client)
+ throws Exception {
+ Await.result(client.getRight().setAcceptNewStream(enabled));
+ return 0;
+ }
+
+ @Override
+ protected String getUsage() {
+ return "accept-new-stream [options]";
+ }
+ }
+
+ public ProxyTool() {
+ super();
+ addCommand(new ReleaseCommand());
+ addCommand(new TruncateCommand());
+ addCommand(new AcceptNewStreamCommand());
+ }
+
+ @Override
+ protected String getName() {
+ return "proxy_tool";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
new file mode 100644
index 0000000..92d0a7d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Service related tools.
+ */
+package org.apache.distributedlog.service.tools;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
new file mode 100644
index 0000000..9ee93b4
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.utils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Utils that used by servers.
+ */
+public class ServerUtils {
+
+ /**
+ * Retrieve the ledger allocator pool name.
+ *
+ * @param serverRegionId region id that that server is running
+ * @param shardId shard id of the server
+ * @param useHostname whether to use hostname as the ledger allocator pool name
+ * @return ledger allocator pool name
+ * @throws IOException
+ */
+ public static String getLedgerAllocatorPoolName(int serverRegionId,
+ int shardId,
+ boolean useHostname)
+ throws IOException {
+ if (useHostname) {
+ return String.format("allocator_%04d_%s", serverRegionId,
+ InetAddress.getLocalHost().getHostAddress());
+ } else {
+ return String.format("allocator_%04d_%010d", serverRegionId, shardId);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
new file mode 100644
index 0000000..99cf736
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities used by proxy servers.
+ */
+package org.apache.distributedlog.service.utils;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.conf b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
new file mode 100644
index 0000000..d2fddf5
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
@@ -0,0 +1,31 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+region_stop_accept_new_stream=0
+disable_durability_enforcement=0
+disable_write_limit=0
+bkc.repp_disable_durability_enforcement=0
+bkc.disable_ensemble_change=0
+dl.disable_logsegment_rolling=0
+dl.disable_write_limit=0
+bkc.atla.disallow_bookie_placement=0
+bkc.atlb.disallow_bookie_placement=0
+bkc.smf1.disallow_bookie_placement=0
+service_rate_limit_disabled=0
+service_checksum_disabled=0
+service_global_limiter_disabled=0
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.yml b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
new file mode 100644
index 0000000..7df24bb
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
@@ -0,0 +1,44 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+region_stop_accept_new_stream:
+ default_availability: 0
+disable_durability_enforcement:
+ default_availability: 0
+disable_write_limit:
+ default_availability: 0
+bkc.repp_disable_durability_enforcement:
+ default_availability: 0
+bkc.disable_ensemble_change:
+ default_availability: 0
+dl.disable_logsegment_rolling:
+ default_availability: 0
+dl.disable_write_limit:
+ default_availability: 0
+bkc.atla.disallow_bookie_placement:
+ default_availability: 0
+bkc.atlb.disallow_bookie_placement:
+ default_availability: 0
+bkc.smf1.disallow_bookie_placement:
+ default_availability: 0
+service_rate_limit_disabled:
+ default_availability: 0
+service_checksum_disabled:
+ default_availability: 0
+service_global_limiter_disabled:
+ default_availability: 0
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..e101a4d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,39 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+//-->
+<FindBugsFilter>
+ <Match>
+ <!-- generated code, we can't be held responsible for findbugs in it //-->
+ <Class name="~org\.apache\.distributedlog\.thrift.*" />
+ </Match>
+ <Match>
+ <!-- generated code, we can't be held responsible for findbugs in it //-->
+ <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" />
+ </Match>
+ <Match>
+ <!-- it is safe to cast exception here. //-->
+ <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" />
+ <Method name="onFailure" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
+ <Match>
+ <!-- it is safe to cast exception here. //-->
+ <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" />
+ <Method name="isDefiniteFailure" />
+ <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" />
+ </Match>
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/thrift/metadata.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/thrift/metadata.thrift b/distributedlog-proxy-server/src/main/thrift/metadata.thrift
new file mode 100644
index 0000000..9cb3c72
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/thrift/metadata.thrift
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace java org.apache.distributedlog.service.placement.thrift
+
+struct StreamLoad {
+ 1: optional string stream
+ 2: optional i32 load
+}
+
+struct ServerLoad {
+ 1: optional string server
+ 2: optional i64 load
+ 3: optional list<StreamLoad> streams
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
new file mode 100644
index 0000000..a9ddae5
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.Sets;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * A local routing service that used for testing.
+ */
+public class LocalRoutingService implements RoutingService {
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder to build a local routing service for testing.
+ */
+ public static class Builder implements RoutingService.Builder {
+
+ private Builder() {}
+
+ @Override
+ public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
+ return this;
+ }
+
+ @Override
+ public LocalRoutingService build() {
+ return new LocalRoutingService();
+ }
+ }
+
+ private final Map<String, LinkedHashSet<SocketAddress>> localAddresses =
+ new HashMap<String, LinkedHashSet<SocketAddress>>();
+ private final CopyOnWriteArrayList<RoutingListener> listeners =
+ new CopyOnWriteArrayList<RoutingListener>();
+
+ boolean allowRetrySameHost = true;
+
+ @Override
+ public void startService() {
+ // nop
+ }
+
+ @Override
+ public void stopService() {
+ // nop
+ }
+
+ @Override
+ public synchronized Set<SocketAddress> getHosts() {
+ Set<SocketAddress> hosts = Sets.newHashSet();
+ for (LinkedHashSet<SocketAddress> addresses : localAddresses.values()) {
+ hosts.addAll(addresses);
+ }
+ return hosts;
+ }
+
+ @Override
+ public RoutingService registerListener(RoutingListener listener) {
+ listeners.add(listener);
+ return this;
+ }
+
+ @Override
+ public RoutingService unregisterListener(RoutingListener listener) {
+ listeners.remove(listener);
+ return this;
+ }
+
+ public LocalRoutingService setAllowRetrySameHost(boolean enabled) {
+ allowRetrySameHost = enabled;
+ return this;
+ }
+
+ public LocalRoutingService addHost(String stream, SocketAddress address) {
+ boolean notify = false;
+ synchronized (this) {
+ LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream);
+ if (null == addresses) {
+ addresses = new LinkedHashSet<SocketAddress>();
+ localAddresses.put(stream, addresses);
+ }
+ if (addresses.add(address)) {
+ notify = true;
+ }
+ }
+ if (notify) {
+ for (RoutingListener listener : listeners) {
+ listener.onServerJoin(address);
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public synchronized SocketAddress getHost(String key, RoutingContext rContext)
+ throws NoBrokersAvailableException {
+ LinkedHashSet<SocketAddress> addresses = localAddresses.get(key);
+
+ SocketAddress candidate = null;
+ if (null != addresses) {
+ for (SocketAddress host : addresses) {
+ if (rContext.isTriedHost(host) && !allowRetrySameHost) {
+ continue;
+ } else {
+ candidate = host;
+ break;
+ }
+ }
+ }
+ if (null != candidate) {
+ return candidate;
+ }
+ throw new NoBrokersAvailableException("No host available");
+ }
+
+ @Override
+ public void removeHost(SocketAddress address, Throwable reason) {
+ // nop
+ }
+}