You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:20 UTC

[13/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 0000000..c3c5d81
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.exceptions.ChecksumFailedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * Stream admin op.
+ */
+public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
+
+    protected final String stream;
+    protected final StreamManager streamManager;
+    protected final OpStatsLogger opStatsLogger;
+    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    protected final Long checksum;
+    protected final Feature checksumDisabledFeature;
+
+    protected StreamAdminOp(String stream,
+                            StreamManager streamManager,
+                            OpStatsLogger statsLogger,
+                            Long checksum,
+                            Feature checksumDisabledFeature) {
+        this.stream = stream;
+        this.streamManager = streamManager;
+        this.opStatsLogger = statsLogger;
+        // start here in case the operation is failed before executing.
+        stopwatch.reset().start();
+        this.checksum = checksum;
+        this.checksumDisabledFeature = checksumDisabledFeature;
+    }
+
+    protected Long computeChecksum() {
+        return ProtocolUtils.streamOpCRC32(stream);
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+            Long serverChecksum = computeChecksum();
+            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+                throw new ChecksumFailedException();
+            }
+        }
+    }
+
+    /**
+     * Execute the operation.
+     *
+     * @return execute operation
+     */
+    protected abstract Future<WriteResponse> executeOp();
+
+    @Override
+    public Future<WriteResponse> execute() {
+        return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() {
+
+            @Override
+            public WriteResponse map(WriteResponse response) {
+                opStatsLogger.registerSuccessfulEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return response;
+            }
+
+            @Override
+            public WriteResponse handle(Throwable cause) {
+                opStatsLogger.registerFailedEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+            }
+
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
new file mode 100644
index 0000000..5b583e1
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Admin Operations.
+ */
+package org.apache.distributedlog.service.stream.admin;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
new file mode 100644
index 0000000..5db2037
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import java.io.Closeable;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dynamically rebuild a rate limiter when the supplied dynamic config changes.
+ *
+ * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
+ * the config listener.
+ */
+public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
+
+    private final ConfigurationListener listener;
+    private final Feature rateLimitDisabledFeature;
+    volatile RequestLimiter<Req> limiter;
+    final DynamicDistributedLogConfiguration dynConf;
+
+    public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf,
+                                 StatsLogger statsLogger,
+                                 Feature rateLimitDisabledFeature) {
+        final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic");
+        this.dynConf = dynConf;
+        this.rateLimitDisabledFeature = rateLimitDisabledFeature;
+        this.listener = new ConfigurationListener() {
+            @Override
+            public void configurationChanged(ConfigurationEvent event) {
+                // Note that this method may be called several times if several config options
+                // are changed. The effect is harmless except that we create and discard more
+                // objects than we need to.
+                LOG.debug("Config changed callback invoked with event {} {} {} {}", new Object[] {
+                        event.getPropertyName(), event.getPropertyValue(), event.getType(),
+                        event.isBeforeUpdate()});
+                if (!event.isBeforeUpdate()) {
+                    limiterStatsLogger.getCounter("config_changed").inc();
+                    LOG.debug("Rebuilding limiter");
+                    limiter = build();
+                }
+            }
+        };
+        LOG.debug("Registering config changed callback");
+        dynConf.addConfigurationListener(listener);
+    }
+
+    public void initialize() {
+        this.limiter = build();
+    }
+
+    @Override
+    public void apply(Req request) throws OverCapacityException {
+        if (rateLimitDisabledFeature.isAvailable()) {
+            return;
+        }
+        limiter.apply(request);
+    }
+
+    @Override
+    public void close() {
+        boolean success = dynConf.removeConfigurationListener(listener);
+        LOG.debug("Deregistering config changed callback success={}", success);
+    }
+
+   /**
+    * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed.
+    * This may be called multiple times so the method should be cheap.
+    */
+    protected abstract RequestLimiter<Req> build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
new file mode 100644
index 0000000..fc30599
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.GuavaRateLimiter;
+import org.apache.distributedlog.limiter.RateLimiter;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.distributedlog.service.stream.WriteOpWithPayload;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Request limiter builder.
+ */
+public class RequestLimiterBuilder {
+    private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION;
+    private RateLimiter limiter;
+    private CostFunction<StreamOp> costFunction;
+    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+
+    /**
+     * Function to calculate the `RPS` (Request per second) cost of a given stream operation.
+     */
+    public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() {
+        @Override
+        public int apply(StreamOp op) {
+            if (op instanceof WriteOpWithPayload) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    };
+
+    /**
+     * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation.
+     */
+    public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() {
+        @Override
+        public int apply(StreamOp op) {
+            if (op instanceof WriteOpWithPayload) {
+                WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
+                return (int) Math.min(writeOp.getPayloadSize(), Integer.MAX_VALUE);
+            } else {
+                return 0;
+            }
+        }
+    };
+
+    /**
+     * Function to check if a stream operation will cause {@link OverCapacityException}.
+     */
+    public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() {
+        @Override
+        public void apply(StreamOp op) throws OverCapacityException {
+            return;
+        }
+    };
+
+    public RequestLimiterBuilder limit(int limit) {
+        this.limiter = GuavaRateLimiter.of(limit);
+        return this;
+    }
+
+    public RequestLimiterBuilder overlimit(OverlimitFunction<StreamOp> overlimitFunction) {
+        this.overlimitFunction = overlimitFunction;
+        return this;
+    }
+
+    public RequestLimiterBuilder cost(CostFunction<StreamOp> costFunction) {
+        this.costFunction = costFunction;
+        return this;
+    }
+
+    public RequestLimiterBuilder statsLogger(StatsLogger statsLogger) {
+        this.statsLogger = statsLogger;
+        return this;
+    }
+
+    public static RequestLimiterBuilder newRpsLimiterBuilder() {
+        return new RequestLimiterBuilder().cost(RPS_COST_FUNCTION);
+    }
+
+    public static RequestLimiterBuilder newBpsLimiterBuilder() {
+        return new RequestLimiterBuilder().cost(BPS_COST_FUNCTION);
+    }
+
+    public RequestLimiter<StreamOp> build() {
+        checkNotNull(limiter);
+        checkNotNull(overlimitFunction);
+        checkNotNull(costFunction);
+        return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
new file mode 100644
index 0000000..de805aa
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ChainedRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Request limiter for the service instance (global request limiter).
+ */
+public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> {
+    private final StatsLogger limiterStatLogger;
+    private final MovingAverageRate serviceRps;
+    private final MovingAverageRate serviceBps;
+    private final StreamManager streamManager;
+
+    public ServiceRequestLimiter(DynamicDistributedLogConfiguration dynConf,
+                                 StatsLogger statsLogger,
+                                 MovingAverageRate serviceRps,
+                                 MovingAverageRate serviceBps,
+                                 StreamManager streamManager,
+                                 Feature disabledFeature) {
+        super(dynConf, statsLogger, disabledFeature);
+        this.limiterStatLogger = statsLogger;
+        this.streamManager = streamManager;
+        this.serviceRps = serviceRps;
+        this.serviceBps = serviceBps;
+        this.limiter = build();
+    }
+
+    @Override
+    public RequestLimiter<StreamOp> build() {
+        int rpsStreamAcquireLimit = dynConf.getRpsStreamAcquireServiceLimit();
+        int rpsSoftServiceLimit = dynConf.getRpsSoftServiceLimit();
+        int rpsHardServiceLimit = dynConf.getRpsHardServiceLimit();
+        int bpsStreamAcquireLimit = dynConf.getBpsStreamAcquireServiceLimit();
+        int bpsSoftServiceLimit = dynConf.getBpsSoftServiceLimit();
+        int bpsHardServiceLimit = dynConf.getBpsHardServiceLimit();
+
+        RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
+            .limit(rpsHardServiceLimit)
+            .overlimit(new OverlimitFunction<StreamOp>() {
+                @Override
+                public void apply(StreamOp request) throws OverCapacityException {
+                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance");
+                }
+            });
+
+        RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
+            .limit(rpsSoftServiceLimit);
+
+        RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
+            .limit(bpsHardServiceLimit)
+            .overlimit(new OverlimitFunction<StreamOp>() {
+                @Override
+                public void apply(StreamOp request) throws OverCapacityException {
+                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance");
+                }
+            });
+
+        RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
+            .limit(bpsSoftServiceLimit);
+
+        ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
+        builder.addLimiter(new StreamAcquireLimiter(
+            streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
+        builder.addLimiter(new StreamAcquireLimiter(
+            streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
+        builder.addLimiter(bpsHardLimiterBuilder.build());
+        builder.addLimiter(bpsSoftLimiterBuilder.build());
+        builder.addLimiter(rpsHardLimiterBuilder.build());
+        builder.addLimiter(rpsSoftLimiterBuilder.build());
+        builder.statsLogger(limiterStatLogger);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
new file mode 100644
index 0000000..7675d6f
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.exceptions.TooManyStreamsException;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A special limiter on limiting acquiring new streams.
+ */
+public class StreamAcquireLimiter implements RequestLimiter<StreamOp> {
+    private final StreamManager streamManager;
+    private final MovingAverageRate serviceRps;
+    private final double serviceRpsLimit;
+    private final Counter overlimitCounter;
+
+    public StreamAcquireLimiter(StreamManager streamManager,
+                                MovingAverageRate serviceRps,
+                                double serviceRpsLimit,
+                                StatsLogger statsLogger) {
+        this.streamManager = streamManager;
+        this.serviceRps = serviceRps;
+        this.serviceRpsLimit = serviceRpsLimit;
+        this.overlimitCounter = statsLogger.getCounter("overlimit");
+    }
+
+    @Override
+    public void apply(StreamOp op) throws OverCapacityException {
+        String streamName = op.streamName();
+        if (serviceRpsLimit > -1 && serviceRps.get() > serviceRpsLimit && !streamManager.isAcquired(streamName)) {
+            overlimitCounter.inc();
+            throw new TooManyStreamsException("Request rate is too high to accept new stream " + streamName + ".");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
new file mode 100644
index 0000000..42b4e1e
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.limiter;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.limiter.ChainedRequestLimiter;
+import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
+import org.apache.distributedlog.limiter.RequestLimiter;
+import org.apache.distributedlog.service.stream.StreamOp;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A dynamic request limiter on limiting stream operations.
+ */
+public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> {
+    private final DynamicDistributedLogConfiguration dynConf;
+    private final StatsLogger limiterStatLogger;
+    private final String streamName;
+
+    public StreamRequestLimiter(String streamName,
+                                DynamicDistributedLogConfiguration dynConf,
+                                StatsLogger statsLogger,
+                                Feature disabledFeature) {
+        super(dynConf, statsLogger, disabledFeature);
+        this.limiterStatLogger = statsLogger;
+        this.dynConf = dynConf;
+        this.streamName = streamName;
+        this.limiter = build();
+    }
+
+    @Override
+    public RequestLimiter<StreamOp> build() {
+
+        // RPS hard, soft limits
+        RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
+            .limit(dynConf.getRpsHardWriteLimit())
+            .overlimit(new OverlimitFunction<StreamOp>() {
+                @Override
+                public void apply(StreamOp op) throws OverCapacityException {
+                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName);
+                }
+            });
+        RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
+            .limit(dynConf.getRpsSoftWriteLimit());
+
+        // BPS hard, soft limits
+        RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
+            .limit(dynConf.getBpsHardWriteLimit())
+            .overlimit(new OverlimitFunction<StreamOp>() {
+                @Override
+                public void apply(StreamOp op) throws OverCapacityException {
+                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName);
+                }
+            });
+        RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
+            .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
+            .limit(dynConf.getBpsSoftWriteLimit());
+
+        ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
+        builder.addLimiter(rpsSoftLimiterBuilder.build());
+        builder.addLimiter(rpsHardLimiterBuilder.build());
+        builder.addLimiter(bpsSoftLimiterBuilder.build());
+        builder.addLimiter(bpsHardLimiterBuilder.build());
+        builder.statsLogger(limiterStatLogger);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
new file mode 100644
index 0000000..c666b08
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Request Rate Limiting.
+ */
+package org.apache.distributedlog.service.stream.limiter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
new file mode 100644
index 0000000..7429a85
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Operations.
+ */
+package org.apache.distributedlog.service.stream;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
new file mode 100644
index 0000000..72668c2
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A stream-to-partition converter that caches the mapping between stream and partitions.
+ */
+public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter {
+
+    private final ConcurrentMap<String, Partition> partitions;
+
+    protected CacheableStreamPartitionConverter() {
+        this.partitions = new ConcurrentHashMap<String, Partition>();
+    }
+
+    @Override
+    public Partition convert(String streamName) {
+        Partition p = partitions.get(streamName);
+        if (null != p) {
+            return p;
+        }
+        // not found
+        Partition newPartition = newPartition(streamName);
+        Partition oldPartition = partitions.putIfAbsent(streamName, newPartition);
+        if (null == oldPartition) {
+            return newPartition;
+        } else {
+            return oldPartition;
+        }
+    }
+
+    /**
+     * Create the partition from <code>streamName</code>.
+     *
+     * @param streamName
+     *          stream name
+     * @return partition id of the stream
+     */
+    protected abstract Partition newPartition(String streamName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
new file mode 100644
index 0000000..30b2896
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter.
+ */
+public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter {
+
+    private final String delimiter;
+
+    public DelimiterStreamPartitionConverter() {
+        this("_");
+    }
+
+    public DelimiterStreamPartitionConverter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    @Override
+    protected Partition newPartition(String streamName) {
+        String[] parts = StringUtils.split(streamName, delimiter);
+        if (null != parts && parts.length == 2) {
+            try {
+                int partition = Integer.parseInt(parts[1]);
+                return new Partition(parts[0], partition);
+            } catch (NumberFormatException nfe) {
+                // ignore the exception
+            }
+        }
+        return new Partition(streamName, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
new file mode 100644
index 0000000..5be172f
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+/**
+ * Map stream name to partition of the same name.
+ */
+public class IdentityStreamPartitionConverter extends CacheableStreamPartitionConverter {
+    @Override
+    protected Partition newPartition(String streamName) {
+        return new Partition(streamName, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
new file mode 100644
index 0000000..aa69276
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import com.google.common.base.Objects;
+
+/**
+ * `Partition` defines the relationship between a `virtual` stream and a
+ * physical DL stream.
+ *
+ * <p>A `virtual` stream could be partitioned into multiple partitions
+ * and each partition is effectively a DL stream.
+ */
+public class Partition {
+
+    // Name of its parent stream.
+    private final String stream;
+
+    // Unique id of the partition within the stream.
+    // It can be just simply an index id.
+    public final int id;
+
+    public Partition(String stream, int id) {
+        this.stream = stream;
+        this.id = id;
+    }
+
+    /**
+     * Get the `virtual` stream name.
+     *
+     * @return the stream name.
+     */
+    public String getStream() {
+        return stream;
+    }
+
+    /**
+     * Get the partition id of this partition.
+     *
+     * @return partition id
+     */
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Get the 6 digit 0 padded id of this partition as a String.
+     * @return partition id
+     */
+    public String getPaddedId() {
+        return String.format("%06d", getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Partition)) {
+            return false;
+        }
+        Partition partition = (Partition) o;
+
+        return id == partition.id && Objects.equal(stream, partition.stream);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = stream.hashCode();
+        result = 31 * result + id;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Partition(")
+          .append(stream)
+          .append(", ")
+          .append(id)
+          .append(")");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
new file mode 100644
index 0000000..bfcc5db
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A mapping between a logical stream and a set of physical partitions.
+ */
+public class PartitionMap {
+
+    private final Map<String, Set<Partition>> partitionMap;
+
+    public PartitionMap() {
+        partitionMap = new HashMap<String, Set<Partition>>();
+    }
+
+    public synchronized boolean addPartition(Partition partition, int maxPartitions) {
+        if (maxPartitions <= 0) {
+            return true;
+        }
+        Set<Partition> partitions = partitionMap.get(partition.getStream());
+        if (null == partitions) {
+            partitions = new HashSet<Partition>();
+            partitions.add(partition);
+            partitionMap.put(partition.getStream(), partitions);
+            return true;
+        }
+        if (partitions.contains(partition) || partitions.size() < maxPartitions) {
+            partitions.add(partition);
+            return true;
+        }
+        return false;
+    }
+
+    public synchronized boolean removePartition(Partition partition) {
+        Set<Partition> partitions = partitionMap.get(partition.getStream());
+        return null != partitions && partitions.remove(partition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
new file mode 100644
index 0000000..3ea1337
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.streamset;
+
+/**
+ * Map stream name to a partition.
+ *
+ * @see Partition
+ */
+public interface StreamPartitionConverter {
+
+    /**
+     * Convert the stream name to partition.
+     *
+     * @param streamName
+     *          stream name
+     * @return partition
+     */
+    Partition convert(String streamName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
new file mode 100644
index 0000000..d185e88
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * StreamSet - A logical set of streams.
+ */
+package org.apache.distributedlog.service.streamset;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
new file mode 100644
index 0000000..3934eb5
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.tools;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.tools.Tool;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tools to interact with proxies.
+ */
+public class ProxyTool extends Tool {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
+
+    /**
+     * Abstract Cluster level command.
+     */
+    protected abstract static class ClusterCommand extends OptsCommand {
+
+        protected Options options = new Options();
+        protected URI uri;
+        protected final List<String> streams = new ArrayList<String>();
+
+        protected ClusterCommand(String name, String description) {
+            super(name, description);
+            options.addOption("u", "uri", true, "DistributedLog URI");
+            options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
+            options.addOption("e", "expression", true, "Expression to generate stream suffix. "
+                + "Currently we support range '0-9', list '1,2,3' and name '143'");
+        }
+
+        @Override
+        protected int runCmd(CommandLine commandLine) throws Exception {
+            try {
+                parseCommandLine(commandLine);
+            } catch (ParseException pe) {
+                System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
+                printUsage();
+                return -1;
+            }
+
+            DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
+            logger.info("Created serverset for {}", uri);
+            try {
+                DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
+                        .name("proxy_tool")
+                        .clientId(ClientId$.MODULE$.apply("proxy_tool"))
+                        .maxRedirects(2)
+                        .serverSet(serverSet.getServerSet())
+                        .clientBuilder(ClientBuilder.get()
+                            .connectionTimeout(Duration.fromSeconds(2))
+                            .tcpConnectTimeout(Duration.fromSeconds(2))
+                            .requestTimeout(Duration.fromSeconds(10))
+                            .hostConnectionLimit(1)
+                            .hostConnectionCoresize(1)
+                            .keepAlive(true)
+                            .failFast(false))
+                        .build();
+                try {
+                    return runCmd(client);
+                } finally {
+                    client.close();
+                }
+            } finally {
+                serverSet.close();
+            }
+        }
+
+        protected abstract int runCmd(DistributedLogClient client) throws Exception;
+
+        @Override
+        protected Options getOptions() {
+            return options;
+        }
+
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            if (!cmdline.hasOption("u")) {
+                throw new ParseException("No distributedlog uri provided.");
+            }
+            this.uri = URI.create(cmdline.getOptionValue("u"));
+
+            // get stream names
+            String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : "";
+            String streamExpression = null;
+            if (cmdline.hasOption("e")) {
+                streamExpression = cmdline.getOptionValue("e");
+            }
+            if (null == streamPrefix || null == streamExpression) {
+                throw new ParseException("Please specify stream prefix & expression.");
+            }
+            // parse the stream expression
+            if (streamExpression.contains("-")) {
+                // a range expression
+                String[] parts = streamExpression.split("-");
+                if (parts.length != 2) {
+                    throw new ParseException("Invalid stream index range : " + streamExpression);
+                }
+                try {
+                    int start = Integer.parseInt(parts[0]);
+                    int end = Integer.parseInt(parts[1]);
+                    if (start > end) {
+                        throw new ParseException("Invalid stream index range : " + streamExpression);
+                    }
+                    for (int i = start; i <= end; i++) {
+                        streams.add(streamPrefix + i);
+                    }
+                } catch (NumberFormatException nfe) {
+                    throw new ParseException("Invalid stream index range : " + streamExpression);
+                }
+            } else if (streamExpression.contains(",")) {
+                // a list expression
+                String[] parts = streamExpression.split(",");
+                try {
+                    for (String part : parts) {
+                        streams.add(streamPrefix + part);
+                    }
+                } catch (NumberFormatException nfe) {
+                    throw new ParseException("Invalid stream suffix list : " + streamExpression);
+                }
+            } else {
+                streams.add(streamPrefix + streamExpression);
+            }
+        }
+    }
+
+    /**
+     * Command to release ownership of a log stream.
+     */
+    static class ReleaseCommand extends ClusterCommand {
+
+        double rate = 100f;
+
+        ReleaseCommand() {
+            super("release", "Release Stream Ownerships");
+            options.addOption("t", "rate", true, "Rate to release streams");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (cmdline.hasOption("t")) {
+                rate = Double.parseDouble(cmdline.getOptionValue("t", "100"));
+            }
+        }
+
+        @Override
+        protected int runCmd(DistributedLogClient client) throws Exception {
+            RateLimiter rateLimiter = RateLimiter.create(rate);
+            for (String stream : streams) {
+                rateLimiter.acquire();
+                try {
+                    Await.result(client.release(stream));
+                    System.out.println("Release ownership of stream " + stream);
+                } catch (Exception e) {
+                    System.err.println("Failed to release ownership of stream " + stream);
+                    throw e;
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "release [options]";
+        }
+    }
+
+    /**
+     * Command to truncate a log stream.
+     */
+    static class TruncateCommand extends ClusterCommand {
+
+        DLSN dlsn = DLSN.InitialDLSN;
+
+        TruncateCommand() {
+            super("truncate", "Truncate streams until given dlsn.");
+            options.addOption("d", "dlsn", true, "DLSN to truncate until");
+        }
+
+        @Override
+        protected int runCmd(DistributedLogClient client) throws Exception {
+            System.out.println("Truncating streams : " + streams);
+            for (String stream : streams) {
+                boolean success = Await.result(client.truncate(stream, dlsn));
+                System.out.println("Truncate " + stream + " to " + dlsn + " : " + success);
+            }
+            return 0;
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("d")) {
+                throw new ParseException("No DLSN provided");
+            }
+            String[] dlsnStrs = cmdline.getOptionValue("d").split(",");
+            if (dlsnStrs.length != 3) {
+                throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d"));
+            }
+            dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2]));
+        }
+
+        @Override
+        protected String getUsage() {
+            return "truncate [options]";
+        }
+    }
+
+    /**
+     * Abstract command to operate on a single proxy server.
+     */
+    protected abstract static class ProxyCommand extends OptsCommand {
+
+        protected Options options = new Options();
+        protected InetSocketAddress address;
+
+        protected ProxyCommand(String name, String description) {
+            super(name, description);
+            options.addOption("H", "host", true, "Single Proxy Address");
+        }
+
+        @Override
+        protected Options getOptions() {
+            return options;
+        }
+
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            if (!cmdline.hasOption("H")) {
+                throw new ParseException("No proxy address provided");
+            }
+            address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H"));
+        }
+
+        @Override
+        protected int runCmd(CommandLine commandLine) throws Exception {
+            try {
+                parseCommandLine(commandLine);
+            } catch (ParseException pe) {
+                System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
+                printUsage();
+                return -1;
+            }
+
+            DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder()
+                    .name("proxy_tool")
+                    .clientId(ClientId$.MODULE$.apply("proxy_tool"))
+                    .maxRedirects(2)
+                    .host(address)
+                    .clientBuilder(ClientBuilder.get()
+                            .connectionTimeout(Duration.fromSeconds(2))
+                            .tcpConnectTimeout(Duration.fromSeconds(2))
+                            .requestTimeout(Duration.fromSeconds(10))
+                            .hostConnectionLimit(1)
+                            .hostConnectionCoresize(1)
+                            .keepAlive(true)
+                            .failFast(false));
+            Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+                    ClientUtils.buildClient(clientBuilder);
+            try {
+                return runCmd(clientPair);
+            } finally {
+                clientPair.getLeft().close();
+            }
+        }
+
+        protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
+    }
+
+    /**
+     * Command to enable/disable accepting new streams.
+     */
+    static class AcceptNewStreamCommand extends ProxyCommand {
+
+        boolean enabled = false;
+
+        AcceptNewStreamCommand() {
+            super("accept-new-stream", "Enable/Disable accepting new streams for one proxy");
+            options.addOption("e", "enabled", true, "Enable/Disable accepting new streams");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("e")) {
+                throw new ParseException("No action 'enable/disable' provided");
+            }
+            enabled = Boolean.parseBoolean(cmdline.getOptionValue("e"));
+        }
+
+        @Override
+        protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client)
+                throws Exception {
+            Await.result(client.getRight().setAcceptNewStream(enabled));
+            return 0;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "accept-new-stream [options]";
+        }
+    }
+
+    public ProxyTool() {
+        super();
+        addCommand(new ReleaseCommand());
+        addCommand(new TruncateCommand());
+        addCommand(new AcceptNewStreamCommand());
+    }
+
+    @Override
+    protected String getName() {
+        return "proxy_tool";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
new file mode 100644
index 0000000..92d0a7d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/tools/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Service related tools.
+ */
+package org.apache.distributedlog.service.tools;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
new file mode 100644
index 0000000..9ee93b4
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.utils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Utils that used by servers.
+ */
+public class ServerUtils {
+
+  /**
+   * Retrieve the ledger allocator pool name.
+   *
+   * @param serverRegionId region id that that server is running
+   * @param shardId shard id of the server
+   * @param useHostname whether to use hostname as the ledger allocator pool name
+   * @return ledger allocator pool name
+   * @throws IOException
+   */
+    public static String getLedgerAllocatorPoolName(int serverRegionId,
+                                                    int shardId,
+                                                    boolean useHostname)
+        throws IOException {
+        if (useHostname) {
+            return String.format("allocator_%04d_%s", serverRegionId,
+                InetAddress.getLocalHost().getHostAddress());
+        } else {
+            return String.format("allocator_%04d_%010d", serverRegionId, shardId);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
new file mode 100644
index 0000000..99cf736
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities used by proxy servers.
+ */
+package org.apache.distributedlog.service.utils;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.conf b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
new file mode 100644
index 0000000..d2fddf5
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.conf
@@ -0,0 +1,31 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+region_stop_accept_new_stream=0
+disable_durability_enforcement=0
+disable_write_limit=0
+bkc.repp_disable_durability_enforcement=0
+bkc.disable_ensemble_change=0
+dl.disable_logsegment_rolling=0
+dl.disable_write_limit=0
+bkc.atla.disallow_bookie_placement=0
+bkc.atlb.disallow_bookie_placement=0
+bkc.smf1.disallow_bookie_placement=0
+service_rate_limit_disabled=0
+service_checksum_disabled=0
+service_global_limiter_disabled=0

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/config/server_decider.yml b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
new file mode 100644
index 0000000..7df24bb
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/config/server_decider.yml
@@ -0,0 +1,44 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+region_stop_accept_new_stream:
+  default_availability: 0
+disable_durability_enforcement:
+  default_availability: 0
+disable_write_limit:
+  default_availability: 0
+bkc.repp_disable_durability_enforcement:
+  default_availability: 0
+bkc.disable_ensemble_change:
+  default_availability: 0
+dl.disable_logsegment_rolling:
+  default_availability: 0
+dl.disable_write_limit:
+  default_availability: 0
+bkc.atla.disallow_bookie_placement:
+  default_availability: 0
+bkc.atlb.disallow_bookie_placement:
+  default_availability: 0
+bkc.smf1.disallow_bookie_placement:
+  default_availability: 0
+service_rate_limit_disabled:
+  default_availability: 0
+service_checksum_disabled:
+  default_availability: 0
+service_global_limiter_disabled:
+  default_availability: 0

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..e101a4d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,39 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.thrift.*" />
+  </Match>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" />
+  </Match>
+  <Match>
+    <!-- it is safe to cast exception here. //-->
+    <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" />
+    <Method name="onFailure" />
+    <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
+  <Match>
+    <!-- it is safe to cast exception here. //-->
+    <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" />
+    <Method name="isDefiniteFailure" />
+    <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" />
+  </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/thrift/metadata.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/thrift/metadata.thrift b/distributedlog-proxy-server/src/main/thrift/metadata.thrift
new file mode 100644
index 0000000..9cb3c72
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/thrift/metadata.thrift
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace java org.apache.distributedlog.service.placement.thrift
+
+struct StreamLoad {
+    1: optional string stream
+    2: optional i32 load
+}
+
+struct ServerLoad {
+    1: optional string server
+    2: optional i64 load
+    3: optional list<StreamLoad> streams
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
new file mode 100644
index 0000000..a9ddae5
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.Sets;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * A local routing service that used for testing.
+ */
+public class LocalRoutingService implements RoutingService {
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build a local routing service for testing.
+     */
+    public static class Builder implements RoutingService.Builder {
+
+        private Builder() {}
+
+        @Override
+        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
+            return this;
+        }
+
+        @Override
+        public LocalRoutingService build() {
+            return new LocalRoutingService();
+        }
+    }
+
+    private final Map<String, LinkedHashSet<SocketAddress>> localAddresses =
+            new HashMap<String, LinkedHashSet<SocketAddress>>();
+    private final CopyOnWriteArrayList<RoutingListener> listeners =
+            new CopyOnWriteArrayList<RoutingListener>();
+
+    boolean allowRetrySameHost = true;
+
+    @Override
+    public void startService() {
+        // nop
+    }
+
+    @Override
+    public void stopService() {
+        // nop
+    }
+
+    @Override
+    public synchronized Set<SocketAddress> getHosts() {
+        Set<SocketAddress> hosts = Sets.newHashSet();
+        for (LinkedHashSet<SocketAddress> addresses : localAddresses.values()) {
+            hosts.addAll(addresses);
+        }
+        return hosts;
+    }
+
+    @Override
+    public RoutingService registerListener(RoutingListener listener) {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public RoutingService unregisterListener(RoutingListener listener) {
+        listeners.remove(listener);
+        return this;
+    }
+
+    public LocalRoutingService setAllowRetrySameHost(boolean enabled) {
+        allowRetrySameHost = enabled;
+        return this;
+    }
+
+    public LocalRoutingService addHost(String stream, SocketAddress address) {
+        boolean notify = false;
+        synchronized (this) {
+            LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream);
+            if (null == addresses) {
+                addresses = new LinkedHashSet<SocketAddress>();
+                localAddresses.put(stream, addresses);
+            }
+            if (addresses.add(address)) {
+                notify = true;
+            }
+        }
+        if (notify) {
+            for (RoutingListener listener : listeners) {
+                listener.onServerJoin(address);
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public synchronized SocketAddress getHost(String key, RoutingContext rContext)
+            throws NoBrokersAvailableException {
+        LinkedHashSet<SocketAddress> addresses = localAddresses.get(key);
+
+        SocketAddress candidate = null;
+        if (null != addresses) {
+            for (SocketAddress host : addresses) {
+                if (rContext.isTriedHost(host) && !allowRetrySameHost) {
+                    continue;
+                } else {
+                    candidate = host;
+                    break;
+                }
+            }
+        }
+        if (null != candidate) {
+            return candidate;
+        }
+        throw new NoBrokersAvailableException("No host available");
+    }
+
+    @Override
+    public void removeHost(SocketAddress address, Throwable reason) {
+        // nop
+    }
+}