You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:21 UTC

[14/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
new file mode 100644
index 0000000..c0c0972
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -0,0 +1,926 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.StatusCode;
+import org.apache.distributedlog.exceptions.StreamNotReadyException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.io.Abortables;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.FatalErrorHandler;
+import org.apache.distributedlog.service.ServerFeatureKeys;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.TimeSequencer;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.TimeoutException;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of {@link Stream}.
+ */
+public class StreamImpl implements Stream {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+
+    /**
+     * The status of the stream.
+     *
+     * <p>The status change of the stream should just go in one direction. If a stream hits
+     * any error, the stream should be put in error state. If a stream is in error state,
+     * it should be removed and not reused anymore.
+     */
+    public enum StreamStatus {
+        UNINITIALIZED(-1),
+        INITIALIZING(0),
+        INITIALIZED(1),
+        CLOSING(-4),
+        CLOSED(-5),
+        // if a stream is in error state, it should be abort during closing.
+        ERROR(-6);
+
+        final int code;
+
+        StreamStatus(int code) {
+            this.code = code;
+        }
+
+        int getCode() {
+            return code;
+        }
+
+        public static boolean isUnavailable(StreamStatus status) {
+            return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED == status;
+        }
+    }
+
+    private final String name;
+    private final Partition partition;
+    private DistributedLogManager manager;
+
+    private volatile AsyncLogWriter writer;
+    private volatile StreamStatus status;
+    private volatile String owner;
+    private volatile Throwable lastException;
+    private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
+
+    private final Promise<Void> closePromise = new Promise<Void>();
+    private final Object txnLock = new Object();
+    private final TimeSequencer sequencer = new TimeSequencer();
+    private final StreamRequestLimiter limiter;
+    private final DynamicDistributedLogConfiguration dynConf;
+    private final DistributedLogConfiguration dlConfig;
+    private final DistributedLogNamespace dlNamespace;
+    private final String clientId;
+    private final OrderedScheduler scheduler;
+    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
+    private final Feature featureRateLimitDisabled;
+    private final StreamManager streamManager;
+    private final StreamConfigProvider streamConfigProvider;
+    private final FatalErrorHandler fatalErrorHandler;
+    private final long streamProbationTimeoutMs;
+    private final long serviceTimeoutMs;
+    private final long writerCloseTimeoutMs;
+    private final boolean failFastOnStreamNotReady;
+    private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
+
+    // Stats
+    private final StatsLogger streamLogger;
+    private final StatsLogger streamExceptionStatLogger;
+    private final StatsLogger limiterStatLogger;
+    private final Counter serviceTimeout;
+    private final OpStatsLogger streamAcquireStat;
+    private final OpStatsLogger writerCloseStatLogger;
+    private final Counter pendingOpsCounter;
+    private final Counter unexpectedExceptions;
+    private final Counter writerCloseTimeoutCounter;
+    private final StatsLogger exceptionStatLogger;
+    private final ConcurrentHashMap<String, Counter> exceptionCounters =
+        new ConcurrentHashMap<String, Counter>();
+    private final Gauge<Number> streamStatusGauge;
+
+    // Since we may create and discard streams at initialization if there's a race,
+    // must not do any expensive initialization here (particularly any locking or
+    // significant resource allocation etc.).
+    StreamImpl(final String name,
+               final Partition partition,
+               String clientId,
+               StreamManager streamManager,
+               StreamOpStats streamOpStats,
+               ServerConfiguration serverConfig,
+               DistributedLogConfiguration dlConfig,
+               DynamicDistributedLogConfiguration streamConf,
+               FeatureProvider featureProvider,
+               StreamConfigProvider streamConfigProvider,
+               DistributedLogNamespace dlNamespace,
+               OrderedScheduler scheduler,
+               FatalErrorHandler fatalErrorHandler,
+               HashedWheelTimer requestTimer,
+               Timer futureTimer) {
+        this.clientId = clientId;
+        this.dlConfig = dlConfig;
+        this.streamManager = streamManager;
+        this.name = name;
+        this.partition = partition;
+        this.status = StreamStatus.UNINITIALIZED;
+        this.lastException = new IOException("Fail to write record to stream " + name);
+        this.streamConfigProvider = streamConfigProvider;
+        this.dlNamespace = dlNamespace;
+        this.featureRateLimitDisabled = featureProvider.getFeature(
+            ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase());
+        this.scheduler = scheduler;
+        this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs();
+        this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs();
+        this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs();
+        this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady();
+        this.fatalErrorHandler = fatalErrorHandler;
+        this.dynConf = streamConf;
+        StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
+            streamOpStats.baseScope("stream_limiter"),
+            streamOpStats.streamRequestScope(partition, "limiter"));
+        this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
+        this.requestTimer = requestTimer;
+        this.futureTimer = futureTimer;
+
+        // Stats
+        this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
+        this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
+        this.streamExceptionStatLogger = streamLogger.scope("exceptions");
+        this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
+        StatsLogger streamsStatsLogger = streamOpStats.baseScope("streams");
+        this.streamAcquireStat = streamsStatsLogger.getOpStatsLogger("acquire");
+        this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
+        this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions");
+        this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
+        this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close");
+        this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts");
+        // Gauges
+        this.streamStatusGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return StreamStatus.UNINITIALIZED.getCode();
+            }
+            @Override
+            public Number getSample() {
+                return status.getCode();
+            }
+        };
+    }
+
+    @Override
+    public String getOwner() {
+        return owner;
+    }
+
+    @Override
+    public String getStreamName() {
+        return name;
+    }
+
+    @Override
+    public DynamicDistributedLogConfiguration getStreamConfiguration() {
+        return dynConf;
+    }
+
+    @Override
+    public Partition getPartition() {
+        return partition;
+    }
+
+    private DistributedLogManager openLog(String name) throws IOException {
+        Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent();
+        Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf);
+        Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
+        return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger);
+    }
+
+    // Expensive initialization, only called once per stream.
+    @Override
+    public void initialize() throws IOException {
+        manager = openLog(name);
+
+        // Better to avoid registering the gauge multiple times, so do this in init
+        // which only gets called once.
+        streamLogger.registerGauge("stream_status", this.streamStatusGauge);
+
+        // Signal initialization is complete, should be last in this method.
+        status = StreamStatus.INITIALIZING;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
+    }
+
+    @Override
+    public void start() {
+        // acquire the stream
+        acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+                @Override
+                public void onSuccess(Boolean success) {
+                    if (!success) {
+                        // failed to acquire the stream. set the stream in error status and close it.
+                        setStreamInErrorStatus();
+                        requestClose("Failed to acquire the ownership");
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    // unhandled exceptions
+                    logger.error("Stream {} threw unhandled exception : ", name, cause);
+                    // failed to acquire the stream. set the stream in error status and close it.
+                    setStreamInErrorStatus();
+                    requestClose("Unhandled exception");
+                }
+            });
+    }
+
+    //
+    // Stats Operations
+    //
+
+    void countException(Throwable t, StatsLogger streamExceptionLogger) {
+        String exceptionName = null == t ? "null" : t.getClass().getName();
+        Counter counter = exceptionCounters.get(exceptionName);
+        if (null == counter) {
+            counter = exceptionStatLogger.getCounter(exceptionName);
+            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+            if (null != oldCounter) {
+                counter = oldCounter;
+            }
+        }
+        counter.inc();
+        streamExceptionLogger.getCounter(exceptionName).inc();
+    }
+
+    boolean isCriticalException(Throwable cause) {
+        return !(cause instanceof OwnershipAcquireFailedException);
+    }
+
+    //
+    // Service Timeout:
+    // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+    // - if the operation is completed within timeout period, cancel the timeout.
+    //
+
+    void scheduleTimeout(final StreamOp op) {
+        final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
+            @Override
+            public void run(Timeout timeout) throws Exception {
+                if (!timeout.isCancelled()) {
+                    serviceTimeout.inc();
+                    handleServiceTimeout("Operation " + op.getClass().getName() + " timeout");
+                }
+            }
+        }, serviceTimeoutMs, TimeUnit.MILLISECONDS);
+        op.responseHeader().ensure(new Function0<BoxedUnit>() {
+            @Override
+            public BoxedUnit apply() {
+                timeout.cancel();
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Close the stream and schedule cache eviction at some point in the future.
+     * We delay this as a way to place the stream in a probationary state--cached
+     * in the proxy but unusable.
+     * This mechanism helps the cluster adapt to situations where a proxy has
+     * persistent connectivity/availability issues, because it keeps an affected
+     * stream off the proxy for a period of time, hopefully long enough for the
+     * issues to be resolved, or for whoop to kick in and kill the shard.
+     */
+    void handleServiceTimeout(String reason) {
+        synchronized (this) {
+            if (StreamStatus.isUnavailable(status)) {
+                return;
+            }
+            // Mark stream in error state
+            setStreamInErrorStatus();
+        }
+
+        // Async close request, and schedule eviction when its done.
+        Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
+        closeFuture.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Void result) {
+                streamManager.scheduleRemoval(StreamImpl.this, streamProbationTimeoutMs);
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    //
+    // Submit the operation to the stream.
+    //
+
+    /**
+     * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
+     * execution once complete.
+     *
+     * @param op
+     *          stream operation to execute.
+     */
+    @Override
+    public void submit(StreamOp op) {
+        try {
+            limiter.apply(op);
+        } catch (OverCapacityException ex) {
+            op.fail(ex);
+            return;
+        }
+
+        // Timeout stream op if requested.
+        if (serviceTimeoutMs > 0) {
+            scheduleTimeout(op);
+        }
+
+        boolean completeOpNow = false;
+        boolean success = true;
+        if (StreamStatus.isUnavailable(status)) {
+            // Stream is closed, fail the op immediately
+            op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+            return;
+        } else if (StreamStatus.INITIALIZED == status && writer != null) {
+            completeOpNow = true;
+            success = true;
+        } else {
+            synchronized (this) {
+                if (StreamStatus.isUnavailable(status)) {
+                    // Stream is closed, fail the op immediately
+                    op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+                    return;
+                } else if (StreamStatus.INITIALIZED == status) {
+                    completeOpNow = true;
+                    success = true;
+                } else if (failFastOnStreamNotReady) {
+                    op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
+                    return;
+                } else { // the stream is still initializing
+                    pendingOps.add(op);
+                    pendingOpsCounter.inc();
+                    if (1 == pendingOps.size()) {
+                        if (op instanceof HeartbeatOp) {
+                            ((HeartbeatOp) op).setWriteControlRecord(true);
+                        }
+                    }
+                }
+            }
+        }
+        if (completeOpNow) {
+            executeOp(op, success);
+        }
+    }
+
+    //
+    // Execute operations and handle exceptions on operations
+    //
+
+    /**
+     * Execute the <i>op</i> immediately.
+     *
+     * @param op
+     *          stream operation to execute.
+     * @param success
+     *          whether the operation is success or not.
+     */
+    void executeOp(final StreamOp op, boolean success) {
+        final AsyncLogWriter writer;
+        final Throwable lastException;
+        synchronized (this) {
+            writer = this.writer;
+            lastException = this.lastException;
+        }
+        if (null != writer && success) {
+            op.execute(writer, sequencer, txnLock)
+                    .addEventListener(new FutureEventListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    // nop
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    boolean countAsException = true;
+                    if (cause instanceof DLException) {
+                        final DLException dle = (DLException) cause;
+                        switch (dle.getCode()) {
+                        case StatusCode.FOUND:
+                            assert(cause instanceof OwnershipAcquireFailedException);
+                            countAsException = false;
+                            handleExceptionOnStreamOp(op, cause);
+                            break;
+                        case StatusCode.ALREADY_CLOSED:
+                            assert(cause instanceof AlreadyClosedException);
+                            op.fail(cause);
+                            handleAlreadyClosedException((AlreadyClosedException) cause);
+                            break;
+                        // exceptions that mostly from client (e.g. too large record)
+                        case StatusCode.NOT_IMPLEMENTED:
+                        case StatusCode.METADATA_EXCEPTION:
+                        case StatusCode.LOG_EMPTY:
+                        case StatusCode.LOG_NOT_FOUND:
+                        case StatusCode.TRUNCATED_TRANSACTION:
+                        case StatusCode.END_OF_STREAM:
+                        case StatusCode.TRANSACTION_OUT_OF_ORDER:
+                        case StatusCode.INVALID_STREAM_NAME:
+                        case StatusCode.TOO_LARGE_RECORD:
+                        case StatusCode.STREAM_NOT_READY:
+                        case StatusCode.OVER_CAPACITY:
+                            op.fail(cause);
+                            break;
+                        // the DL writer hits exception, simple set the stream to error status
+                        // and fail the request
+                        default:
+                            handleExceptionOnStreamOp(op, cause);
+                            break;
+                        }
+                    } else {
+                        handleExceptionOnStreamOp(op, cause);
+                    }
+                    if (countAsException) {
+                        countException(cause, streamExceptionStatLogger);
+                    }
+                }
+            });
+        } else {
+            if (null != lastException) {
+                op.fail(lastException);
+            } else {
+                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+            }
+        }
+    }
+
+    /**
+     * Handle exception when executing <i>op</i>.
+     *
+     * @param op
+     *          stream operation executing
+     * @param cause
+     *          exception received when executing <i>op</i>
+     */
+    private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
+        AsyncLogWriter oldWriter = null;
+        boolean statusChanged = false;
+        synchronized (this) {
+            if (StreamStatus.INITIALIZED == status) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
+                statusChanged = true;
+            }
+        }
+        if (statusChanged) {
+            Abortables.asyncAbort(oldWriter, false);
+            if (isCriticalException(cause)) {
+                logger.error("Failed to write data into stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
+            }
+            requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
+        }
+        op.fail(cause);
+    }
+
+    /**
+     * Handling already closed exception.
+     */
+    private void handleAlreadyClosedException(AlreadyClosedException ace) {
+        unexpectedExceptions.inc();
+        logger.error("Encountered unexpected exception when writing data into stream {} : ", name, ace);
+        fatalErrorHandler.notifyFatalError();
+    }
+
+    //
+    // Acquire streams
+    //
+
+    Future<Boolean> acquireStream() {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        final Promise<Boolean> acquirePromise = new Promise<Boolean>();
+        manager.openAsyncLogWriter().addEventListener(
+            FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+
+            @Override
+            public void onSuccess(AsyncLogWriter w) {
+                onAcquireStreamSuccess(w, stopwatch, acquirePromise);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                onAcquireStreamFailure(cause, stopwatch, acquirePromise);
+            }
+
+        }, scheduler, getStreamName()));
+        return acquirePromise;
+    }
+
+    private void onAcquireStreamSuccess(AsyncLogWriter w,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        synchronized (txnLock) {
+            sequencer.setLastId(w.getLastTxId());
+        }
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        synchronized (StreamImpl.this) {
+            oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+                    StreamStatus.INITIALIZING, w, null);
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<StreamOp>();
+            success = true;
+        }
+        // check if the stream is allowed to be acquired
+        if (!streamManager.allowAcquire(StreamImpl.this)) {
+            if (null != oldWriter) {
+                Abortables.asyncAbort(oldWriter, true);
+            }
+            int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+            StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+                    + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+            countException(sue, exceptionStatLogger);
+            logger.error("Failed to acquire stream {} because it is unavailable : {}",
+                    name, sue.getMessage());
+            synchronized (this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZED, null, sue);
+                // we don't switch the pending ops since they are already switched
+                // when setting the status to initialized
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    private void onAcquireStreamFailure(Throwable cause,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        if (cause instanceof AlreadyClosedException) {
+            countException(cause, streamExceptionStatLogger);
+            handleAlreadyClosedException((AlreadyClosedException) cause);
+            return;
+        } else {
+            if (isCriticalException(cause)) {
+                countException(cause, streamExceptionStatLogger);
+                logger.error("Failed to acquire stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+            }
+            synchronized (StreamImpl.this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZING, null, cause);
+                oldPendingOps = pendingOps;
+                pendingOps = new ArrayDeque<StreamOp>();
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    /**
+     * Process the pending request after acquired stream.
+     *
+     * @param success whether the acquisition succeed or not
+     * @param oldWriter the old writer to abort
+     * @param oldPendingOps the old pending ops to execute
+     * @param stopwatch stopwatch to measure the time spent on acquisition
+     * @param acquirePromise the promise to complete the acquire operation
+     */
+    void processPendingRequestsAfterAcquire(boolean success,
+                                            AsyncLogWriter oldWriter,
+                                            Queue<StreamOp> oldPendingOps,
+                                            Stopwatch stopwatch,
+                                            Promise<Boolean> acquirePromise) {
+        if (success) {
+            streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } else {
+            streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+        for (StreamOp op : oldPendingOps) {
+            executeOp(op, success);
+            pendingOpsCounter.dec();
+        }
+        Abortables.asyncAbort(oldWriter, true);
+        FutureUtils.setValue(acquirePromise, success);
+    }
+
+    //
+    // Stream Status Changes
+    //
+
+    synchronized void setStreamInErrorStatus() {
+        if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
+            return;
+        }
+        this.status = StreamStatus.ERROR;
+    }
+
+    /**
+     * Update the stream status. The changes are only applied when there isn't status changed.
+     *
+     * @param newStatus
+     *          new status
+     * @param oldStatus
+     *          old status
+     * @param writer
+     *          new log writer
+     * @param t
+     *          new exception
+     * @return old writer if it exists
+     */
+    synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
+                                                StreamStatus oldStatus,
+                                                AsyncLogWriter writer,
+                                                Throwable t) {
+        if (oldStatus != this.status) {
+            logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
+                    new Object[] { name, oldStatus, this.status, newStatus });
+            return null;
+        }
+
+        String owner = null;
+        if (t instanceof OwnershipAcquireFailedException) {
+            owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+        }
+
+        AsyncLogWriter oldWriter = this.writer;
+        this.writer = writer;
+        if (null != owner && owner.equals(clientId)) {
+            unexpectedExceptions.inc();
+            logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :",
+                         new Object[] { owner, name, t });
+            // I lost the ownership but left a lock over zookeeper
+            // I should not ask client to redirect to myself again as I can't handle it :(
+            // shutdown myself
+            fatalErrorHandler.notifyFatalError();
+            this.owner = null;
+        } else {
+            this.owner = owner;
+        }
+        this.lastException = t;
+        this.status = newStatus;
+        if (StreamStatus.INITIALIZED == newStatus) {
+            streamManager.notifyAcquired(this);
+            logger.info("Inserted acquired stream {} -> writer {}", name, this);
+        } else {
+            streamManager.notifyReleased(this);
+            logger.info("Removed acquired stream {} -> writer {}", name, this);
+        }
+        return oldWriter;
+    }
+
+    //
+    // Stream Close Functions
+    //
+
+    void close(DistributedLogManager dlm) {
+        if (null != dlm) {
+            try {
+                dlm.close();
+            } catch (IOException ioe) {
+                logger.warn("Failed to close dlm for {} : ", name, ioe);
+            }
+        }
+    }
+
+    @Override
+    public Future<Void> requestClose(String reason) {
+        return requestClose(reason, true);
+    }
+
+    Future<Void> requestClose(String reason, boolean uncache) {
+        final boolean abort;
+        closeLock.writeLock().lock();
+        try {
+            if (StreamStatus.CLOSING == status
+                || StreamStatus.CLOSED == status) {
+                return closePromise;
+            }
+            logger.info("Request to close stream {} : {}", getStreamName(), reason);
+            // if the stream isn't closed from INITIALIZED state, we abort the stream instead of closing it.
+            abort = StreamStatus.INITIALIZED != status;
+            status = StreamStatus.CLOSING;
+            streamManager.notifyReleased(this);
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+        // we will fail the requests that are coming in between closing and closed only
+        // after the async writer is closed. so we could clear up the lock before redirect
+        // them.
+        close(abort, uncache);
+        return closePromise;
+    }
+
+    @Override
+    public void delete() throws IOException {
+        if (null != writer) {
+            Utils.close(writer);
+            synchronized (this) {
+                writer = null;
+                lastException = new StreamUnavailableException("Stream was deleted");
+            }
+        }
+        if (null == manager) {
+            throw new UnexpectedException("No stream " + name + " to delete");
+        }
+        manager.delete();
+    }
+
+    /**
+     * Post action executed after closing.
+     */
+    private void postClose(boolean uncache) {
+        closeManagerAndErrorOutPendingRequests();
+        unregisterGauge();
+        if (uncache) {
+            if (null != owner) {
+                long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+                streamManager.scheduleRemoval(this, probationTimeoutMs);
+            } else {
+                streamManager.notifyRemoved(this);
+                logger.info("Removed cached stream {}.", getStreamName());
+            }
+        }
+        FutureUtils.setValue(closePromise, null);
+    }
+
+    /**
+     * Shouldn't call close directly. The callers should call #requestClose instead
+     *
+     * @param shouldAbort shall we abort the stream instead of closing
+     */
+    private Future<Void> close(boolean shouldAbort, final boolean uncache) {
+        boolean abort;
+        closeLock.writeLock().lock();
+        try {
+            if (StreamStatus.CLOSED == status) {
+                return closePromise;
+            }
+            abort = shouldAbort || (StreamStatus.INITIALIZED != status && StreamStatus.CLOSING != status);
+            status = StreamStatus.CLOSED;
+            streamManager.notifyReleased(this);
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+        logger.info("Closing stream {} ...", name);
+        // Close the writers to release the locks before failing the requests
+        Future<Void> closeWriterFuture;
+        if (abort) {
+            closeWriterFuture = Abortables.asyncAbort(writer, true);
+        } else {
+            closeWriterFuture = Utils.asyncClose(writer, true);
+        }
+        // close the manager and error out pending requests after close writer
+        Duration closeWaitDuration;
+        if (writerCloseTimeoutMs <= 0) {
+            closeWaitDuration = Duration.Top();
+        } else {
+            closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
+        }
+
+        FutureUtils.stats(
+                closeWriterFuture,
+                writerCloseStatLogger,
+                Stopwatch.createStarted()
+        ).masked().within(futureTimer, closeWaitDuration)
+                .addEventListener(FutureUtils.OrderedFutureEventListener.of(
+                new FutureEventListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        postClose(uncache);
+                    }
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        if (cause instanceof TimeoutException) {
+                            writerCloseTimeoutCounter.inc();
+                        }
+                        postClose(uncache);
+                    }
+                }, scheduler, name));
+        return closePromise;
+    }
+
+    private void closeManagerAndErrorOutPendingRequests() {
+        close(manager);
+        // Failed the pending requests.
+        Queue<StreamOp> oldPendingOps;
+        synchronized (this) {
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<StreamOp>();
+        }
+        StreamUnavailableException closingException =
+                new StreamUnavailableException("Stream " + name + " is closed.");
+        for (StreamOp op : oldPendingOps) {
+            op.fail(closingException);
+            pendingOpsCounter.dec();
+        }
+        limiter.close();
+        logger.info("Closed stream {}.", name);
+    }
+
+    /**
+     * clean up the gauge to help GC.
+     */
+    private void unregisterGauge(){
+        streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
+    }
+
+    // Test-only apis
+
+    @VisibleForTesting
+    public int numPendingOps() {
+        Queue<StreamOp> queue = pendingOps;
+        return null == queue ? 0 : queue.size();
+    }
+
+    @VisibleForTesting
+    public StreamStatus getStatus() {
+        return status;
+    }
+
+    @VisibleForTesting
+    public void setStatus(StreamStatus status) {
+        this.status = status;
+    }
+
+    @VisibleForTesting
+    public AsyncLogWriter getWriter() {
+        return writer;
+    }
+
+    @VisibleForTesting
+    public DistributedLogManager getManager() {
+        return manager;
+    }
+
+    @VisibleForTesting
+    public Throwable getLastException() {
+        return lastException;
+    }
+
+    @VisibleForTesting
+    public Future<Void> getCloseFuture() {
+        return closePromise;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
new file mode 100644
index 0000000..d86c538
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.base.Optional;
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manage lifecycle of streams.
+ *
+ * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
+ *
+ * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
+ * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter,
+ * managing stream lock, interpreting exceptions, error conditions, and etc.
+ */
+public interface StreamManager {
+
+    /**
+     * Get a cached stream, returning null if it doesnt exist.
+     * @param stream name
+     * @return the cached stream
+     */
+    Stream getStream(String stream);
+
+    /**
+     * Get a cached stream and create a new one if it doesnt exist.
+     * @param streamName stream name
+     * @param start whether to start the stream after it is created.
+     * @return future satisfied once close complete
+     */
+    Stream getOrCreateStream(String streamName, boolean start) throws IOException;
+
+    /**
+     * Asynchronously create a new stream.
+     * @param stream
+     * @return Future satisfied once the stream is created
+     */
+    Future<Void> createStreamAsync(String stream);
+
+    /**
+     * Is acquiring stream allowed?
+     *
+     * @param stream
+     *          stream instance
+     * @return true if it is allowed to acquire this stream, otherwise false.
+     */
+    boolean allowAcquire(Stream stream);
+
+    /**
+     * Notify the manager that a stream was acquired.
+     * @param stream being acquired
+     */
+    void notifyAcquired(Stream stream);
+
+    /**
+     * Notify the manager that a stream was released.
+     * @param stream being released
+     */
+    void notifyReleased(Stream stream);
+
+    /**
+     * Notify the manager that a stream was completely removed.
+     * @param stream being uncached
+     * @return whether the stream existed or not
+     */
+    boolean notifyRemoved(Stream stream);
+
+    /**
+     * Asynchronous delete method.
+     * @param streamName stream name
+     * @return future satisfied once delete complete
+     */
+    Future<Void> deleteAndRemoveAsync(String streamName);
+
+    /**
+     * Asynchronous close and uncache method.
+     * @param streamName stream name
+     * @return future satisfied once close and uncache complete
+     */
+    Future<Void> closeAndRemoveAsync(String streamName);
+
+    /**
+     * Close and uncache after delayMs.
+     * @param stream to remove
+     */
+    void scheduleRemoval(Stream stream, long delayMs);
+
+    /**
+     * Close all stream.
+     * @return future satisfied all streams closed
+     */
+    Future<List<Void>> closeStreams();
+
+    /**
+     * Return map with stream ownership info.
+     * @param regex for filtering streams
+     * @return map containing ownership info
+     */
+    Map<String, String> getStreamOwnershipMap(Optional<String> regex);
+
+    /**
+     * Number of acquired streams.
+     * @return number of acquired streams
+     */
+    int numAcquired();
+
+    /**
+     * Number of cached streams.
+     * @return number of cached streams
+     */
+    int numCached();
+
+    /**
+     * Is the stream denoted by streamName in the acquired state.
+     * @return true if the stream is in the acquired state
+     */
+    boolean isAcquired(String streamName);
+
+    /**
+     * Close manager and disallow further activity.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
new file mode 100644
index 0000000..5d54738
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.PartitionMap;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.util.ConfUtils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track
+ * of Streams.
+ *
+ * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
+ * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply
+ * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl
+ * constructor.
+ */
+public class StreamManagerImpl implements StreamManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
+
+    private final ConcurrentHashMap<String, Stream> streams =
+        new ConcurrentHashMap<String, Stream>();
+    private final AtomicInteger numCached = new AtomicInteger(0);
+
+    private final ConcurrentHashMap<String, Stream> acquiredStreams =
+        new ConcurrentHashMap<String, Stream>();
+    private final AtomicInteger numAcquired = new AtomicInteger(0);
+
+    //
+    // Partitions
+    //
+    private final StreamPartitionConverter partitionConverter;
+    private final PartitionMap cachedPartitions = new PartitionMap();
+    private final PartitionMap acquiredPartitions = new PartitionMap();
+
+    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
+    private final ScheduledExecutorService executorService;
+    private final DistributedLogConfiguration dlConfig;
+    private final StreamConfigProvider streamConfigProvider;
+    private final String clientId;
+    private boolean closed = false;
+    private final StreamFactory streamFactory;
+    private final DistributedLogNamespace dlNamespace;
+
+    public StreamManagerImpl(String clientId,
+                             DistributedLogConfiguration dlConfig,
+                             ScheduledExecutorService executorService,
+                             StreamFactory streamFactory,
+                             StreamPartitionConverter partitionConverter,
+                             StreamConfigProvider streamConfigProvider,
+                             DistributedLogNamespace dlNamespace) {
+        this.clientId = clientId;
+        this.executorService = executorService;
+        this.streamFactory = streamFactory;
+        this.partitionConverter = partitionConverter;
+        this.dlConfig = dlConfig;
+        this.streamConfigProvider = streamConfigProvider;
+        this.dlNamespace = dlNamespace;
+    }
+
+    private DynamicDistributedLogConfiguration getDynConf(String streamName) {
+        Optional<DynamicDistributedLogConfiguration> dynDlConf =
+                streamConfigProvider.getDynamicStreamConfig(streamName);
+        if (dynDlConf.isPresent()) {
+            return dynDlConf.get();
+        } else {
+            return ConfUtils.getConstDynConf(dlConfig);
+        }
+    }
+
+    @Override
+    public boolean allowAcquire(Stream stream) {
+        return acquiredPartitions.addPartition(
+                stream.getPartition(),
+                stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy());
+    }
+
+    /**
+     * Must be enqueued to an executor to avoid deadlocks (close and execute-op both
+     * try to acquire the same read-write lock).
+     */
+    @Override
+    public Future<Void> deleteAndRemoveAsync(final String stream) {
+        final Promise<Void> result = new Promise<Void>();
+        java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+            @Override
+            public void run() {
+                result.become(doDeleteAndRemoveAsync(stream));
+            }
+        }, 0);
+        if (null == scheduleFuture) {
+            return Future.exception(
+                new ServiceUnavailableException("Couldn't schedule a delete task."));
+        }
+        return result;
+    }
+
+    /**
+     * Must be enqueued to an executor to avoid deadlocks (close and execute-op both
+     * try to acquire the same read-write lock).
+     */
+    @Override
+    public Future<Void> closeAndRemoveAsync(final String streamName) {
+        final Promise<Void> releasePromise = new Promise<Void>();
+        java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+            @Override
+            public void run() {
+                releasePromise.become(doCloseAndRemoveAsync(streamName));
+            }
+        }, 0);
+        if (null == scheduleFuture) {
+            return Future.exception(
+                new ServiceUnavailableException("Couldn't schedule a release task."));
+        }
+        return releasePromise;
+    }
+
+    @Override
+    public Future<Void> createStreamAsync(final String stream) {
+        final Promise<Void> createPromise = new Promise<Void>();
+        java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    dlNamespace.createLog(stream);
+                    createPromise.setValue(null);
+                } catch (Exception e) {
+                    createPromise.setException(e);
+                }
+            }
+        }, 0);
+        if (null == scheduleFuture) {
+            return Future.exception(
+                new ServiceUnavailableException("Couldn't schedule a create task."));
+        }
+        return createPromise;
+    }
+
+    @Override
+    public void notifyReleased(Stream stream) {
+        acquiredPartitions.removePartition(stream.getPartition());
+        if (acquiredStreams.remove(stream.getStreamName(), stream)) {
+            numAcquired.getAndDecrement();
+        }
+    }
+
+    @Override
+    public void notifyAcquired(Stream stream) {
+        if (null == acquiredStreams.put(stream.getStreamName(), stream)) {
+            numAcquired.getAndIncrement();
+        }
+    }
+
+    @Override
+    public boolean notifyRemoved(Stream stream) {
+        cachedPartitions.removePartition(stream.getPartition());
+        if (streams.remove(stream.getStreamName(), stream)) {
+            numCached.getAndDecrement();
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Map<String, String> getStreamOwnershipMap(Optional<String> regex) {
+        Map<String, String> ownershipMap = new HashMap<String, String>();
+        for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) {
+            String name = entry.getKey();
+            if (regex.isPresent() && !name.matches(regex.get())) {
+                continue;
+            }
+            Stream stream = entry.getValue();
+            if (null == stream) {
+                continue;
+            }
+            String owner = stream.getOwner();
+            if (null == owner) {
+                ownershipMap.put(name, clientId);
+            }
+        }
+        return ownershipMap;
+    }
+
+    @Override
+    public Stream getStream(String stream) {
+        return streams.get(stream);
+    }
+
+    @Override
+    public Stream getOrCreateStream(String streamName, boolean start) throws IOException {
+        Stream stream = streams.get(streamName);
+        if (null == stream) {
+            closeLock.readLock().lock();
+            try {
+                if (closed) {
+                    return null;
+                }
+                DynamicDistributedLogConfiguration dynConf = getDynConf(streamName);
+                int maxCachedPartitions = dynConf.getMaxCachedPartitionsPerProxy();
+
+                // get partition from the stream name
+                Partition partition = partitionConverter.convert(streamName);
+
+                // add partition to cached map
+                if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) {
+                    throw new StreamUnavailableException("Stream " + streamName
+                            + " is not allowed to cache more than " + maxCachedPartitions + " partitions");
+                }
+
+                stream = newStream(streamName, dynConf);
+                Stream oldWriter = streams.putIfAbsent(streamName, stream);
+                if (null != oldWriter) {
+                    stream = oldWriter;
+                } else {
+                    numCached.getAndIncrement();
+                    logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream);
+                    stream.initialize();
+                    if (start) {
+                        stream.start();
+                    }
+                }
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+        return stream;
+    }
+
+    @Override
+    public Future<List<Void>> closeStreams() {
+        int numAcquired = acquiredStreams.size();
+        int numCached = streams.size();
+        logger.info("Closing all acquired streams : acquired = {}, cached = {}.",
+            numAcquired, numCached);
+        Set<Stream> streamsToClose = new HashSet<Stream>();
+        streamsToClose.addAll(streams.values());
+        return closeStreams(streamsToClose, Optional.<RateLimiter>absent());
+    }
+
+    @Override
+    public void scheduleRemoval(final Stream stream, long delayMs) {
+        if (delayMs > 0) {
+            logger.info("Scheduling removal of stream {} from cache after {} sec.",
+                    stream.getStreamName(), delayMs);
+        }
+        schedule(new Runnable() {
+            @Override
+            public void run() {
+                if (notifyRemoved(stream)) {
+                    logger.info("Removed cached stream {} after probation.", stream.getStreamName());
+                } else {
+                    logger.info("Cached stream {} already removed.", stream.getStreamName());
+                }
+            }
+        }, delayMs);
+    }
+
+    @Override
+    public int numAcquired() {
+        return numAcquired.get();
+    }
+
+    @Override
+    public int numCached() {
+        return numCached.get();
+    }
+
+    @Override
+    public boolean isAcquired(String streamName) {
+        return acquiredStreams.containsKey(streamName);
+    }
+
+    @Override
+    public void close() {
+        closeLock.writeLock().lock();
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
+        if (streamsToClose.isEmpty()) {
+            logger.info("No streams to close.");
+            List<Void> emptyList = new ArrayList<Void>();
+            return Future.value(emptyList);
+        }
+        List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
+        for (Stream stream : streamsToClose) {
+            if (rateLimiter.isPresent()) {
+                rateLimiter.get().acquire();
+            }
+            futures.add(stream.requestClose("Close Streams"));
+        }
+        return Future.collect(futures);
+    }
+
+    private Stream newStream(String name, DynamicDistributedLogConfiguration streamConf) {
+        return streamFactory.create(name, streamConf, this);
+    }
+
+    public Future<Void> doCloseAndRemoveAsync(final String streamName) {
+        Stream stream = streams.get(streamName);
+        if (null == stream) {
+            logger.info("No stream {} to release.", streamName);
+            return Future.value(null);
+        } else {
+            return stream.requestClose("release ownership");
+        }
+    }
+
+    /**
+     * Dont schedule if we're closed - closeLock is acquired to close, so if we acquire the
+     * lock and discover we're not closed, we won't schedule.
+     */
+    private java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                return null;
+            } else if (delayMs > 0) {
+                return executorService.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
+            } else {
+                return executorService.submit(runnable);
+            }
+        } catch (RejectedExecutionException ree) {
+            logger.error("Failed to schedule task {} in {} ms : ",
+                    new Object[] { runnable, delayMs, ree });
+            return null;
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    private Future<Void> doDeleteAndRemoveAsync(final String streamName) {
+        Stream stream = streams.get(streamName);
+        if (null == stream) {
+            logger.warn("No stream {} to delete.", streamName);
+            return Future.exception(new UnexpectedException("No stream " + streamName + " to delete."));
+        } else {
+            Future<Void> result;
+            logger.info("Deleting stream {}, {}", streamName, stream);
+            try {
+                stream.delete();
+                result = stream.requestClose("Stream Deleted");
+            } catch (IOException e) {
+                logger.error("Failed on removing stream {} : ", streamName, e);
+                result = Future.exception(e);
+            }
+            return result;
+        }
+    }
+
+    @VisibleForTesting
+    public ConcurrentHashMap<String, Stream> getCachedStreams() {
+        return streams;
+    }
+
+    @VisibleForTesting
+    public ConcurrentHashMap<String, Stream> getAcquiredStreams() {
+        return acquiredStreams;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
new file mode 100644
index 0000000..d0b8de4
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+
+/**
+ * An operation applied to a stream.
+ */
+public interface StreamOp {
+    /**
+     * Execute a stream op with the supplied writer.
+     *
+     * @param writer active writer for applying the change
+     * @param sequencer sequencer used for generating transaction id for stream operations
+     * @param txnLock transaction lock to guarantee ordering of transaction id
+     * @return a future satisfied when the operation completes execution
+     */
+    Future<Void> execute(AsyncLogWriter writer,
+                         Sequencer sequencer,
+                         Object txnLock);
+
+    /**
+     * Invoked before the stream op is executed.
+     */
+    void preExecute() throws DLException;
+
+    /**
+     * Return the response header (containing the status code etc.).
+     *
+     * @return A future containing the response header or the exception
+     *      encountered by the op if it failed.
+     */
+    Future<ResponseHeader> responseHeader();
+
+    /**
+     * Abort the operation with the givem exception.
+     */
+    void fail(Throwable t);
+
+    /**
+     * Return the stream name.
+     */
+    String streamName();
+
+    /**
+     * Stopwatch gives the start time of the operation.
+     */
+    Stopwatch stopwatch();
+
+    /**
+     * Compute checksum from arguments.
+     */
+    Long computeChecksum();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
new file mode 100644
index 0000000..f3fc610
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Encapsulate stream op stats construction to make it easier to access stream
+ * op stats consistently from different scopes.
+ */
+public class StreamOpStats {
+    private final StatsLogger baseStatsLogger;
+    private final StatsLogger requestStatsLogger;
+    private final StatsLogger recordsStatsLogger;
+    private final StatsLogger requestDeniedStatsLogger;
+    private final StatsLogger streamStatsLogger;
+
+    public StreamOpStats(StatsLogger statsLogger,
+                         StatsLogger perStreamStatsLogger) {
+        this.baseStatsLogger = statsLogger;
+        this.requestStatsLogger = statsLogger.scope("request");
+        this.recordsStatsLogger = statsLogger.scope("records");
+        this.requestDeniedStatsLogger = statsLogger.scope("denied");
+        this.streamStatsLogger = perStreamStatsLogger;
+    }
+
+    public StatsLogger baseStatsLogger(String opName) {
+        return baseStatsLogger;
+    }
+
+    public Counter baseCounter(String opName) {
+        return baseStatsLogger.getCounter(opName);
+    }
+
+    public StatsLogger baseScope(String opName) {
+        return baseStatsLogger.scope(opName);
+    }
+
+    public OpStatsLogger requestLatencyStat(String opName) {
+        return requestStatsLogger.getOpStatsLogger(opName);
+    }
+
+    public StatsLogger requestScope(String scopeName) {
+        return requestStatsLogger.scope(scopeName);
+    }
+
+    public Counter scopedRequestCounter(String opName, String counterName) {
+        return requestScope(opName).getCounter(counterName);
+    }
+
+    public Counter requestCounter(String counterName) {
+        return requestStatsLogger.getCounter(counterName);
+    }
+
+    public Counter requestPendingCounter(String counterName) {
+        return requestCounter(counterName);
+    }
+
+    public Counter requestDeniedCounter(String counterName) {
+        return requestDeniedStatsLogger.getCounter(counterName);
+    }
+
+    public Counter recordsCounter(String counterName) {
+        return recordsStatsLogger.getCounter(counterName);
+    }
+
+    public StatsLogger streamRequestStatsLogger(Partition partition) {
+        return BroadCastStatsLogger.masterslave(
+            streamStatsLogger.scope(partition.getStream()).scope("partition")
+                .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
+                .scope("aggregate"));
+    }
+
+    public StatsLogger streamRequestScope(Partition partition, String scopeName) {
+        return streamRequestStatsLogger(partition).scope(scopeName);
+    }
+
+    public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
+        return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
+    }
+
+    public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
+        return streamRequestScope(partition, opName).getCounter(counterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
new file mode 100644
index 0000000..0036a5c
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to truncate a log stream.
+ */
+public class TruncateOp extends AbstractWriteOp {
+
+    private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
+
+    private final Counter deniedTruncateCounter;
+    private final DLSN dlsn;
+    private final AccessControlManager accessControlManager;
+
+    public TruncateOp(String stream,
+                      DLSN dlsn,
+                      StatsLogger statsLogger,
+                      StatsLogger perStreamStatsLogger,
+                      Long checksum,
+                      Feature checksumDisabledFeature,
+                      AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "truncate"), checksum, checksumDisabledFeature);
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.deniedTruncateCounter = streamOpStats.requestDeniedCounter("truncate");
+        this.accessControlManager = accessControlManager;
+        this.dlsn = dlsn;
+    }
+
+    @Override
+    public Long computeChecksum() {
+        return ProtocolUtils.truncateOpCRC32(stream, dlsn);
+    }
+
+    @Override
+    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+                                              Sequencer sequencer,
+                                              Object txnLock) {
+        if (!stream.equals(writer.getStreamName())) {
+            logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
+            return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
+        }
+        return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() {
+            @Override
+            public WriteResponse apply(Boolean v1) {
+                return ResponseUtils.writeSuccess();
+            }
+        });
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowTruncate(stream)) {
+            deniedTruncateCounter.inc();
+            throw new RequestDeniedException(stream, "truncate");
+        }
+        super.preExecute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
new file mode 100644
index 0000000..2e7ffb8
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to write a single record to a log stream.
+ */
+public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
+
+    private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
+
+    private final byte[] payload;
+    private final boolean isRecordSet;
+
+    // Stats
+    private final Counter deniedWriteCounter;
+    private final Counter successRecordCounter;
+    private final Counter failureRecordCounter;
+    private final Counter redirectRecordCounter;
+    private final OpStatsLogger latencyStat;
+    private final Counter bytes;
+    private final Counter writeBytes;
+
+    private final byte dlsnVersion;
+    private final AccessControlManager accessControlManager;
+
+    public WriteOp(String stream,
+                   ByteBuffer data,
+                   StatsLogger statsLogger,
+                   StatsLogger perStreamStatsLogger,
+                   StreamPartitionConverter streamPartitionConverter,
+                   ServerConfiguration conf,
+                   byte dlsnVersion,
+                   Long checksum,
+                   boolean isRecordSet,
+                   Feature checksumDisabledFeature,
+                   AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature);
+        payload = new byte[data.remaining()];
+        data.get(payload);
+        this.isRecordSet = isRecordSet;
+
+        final Partition partition = streamPartitionConverter.convert(stream);
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.successRecordCounter = streamOpStats.recordsCounter("success");
+        this.failureRecordCounter = streamOpStats.recordsCounter("failure");
+        this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
+        this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
+        this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");
+
+        this.dlsnVersion = dlsnVersion;
+        this.accessControlManager = accessControlManager;
+
+        final long size = getPayloadSize();
+        result().addEventListener(new FutureEventListener<WriteResponse>() {
+            @Override
+            public void onSuccess(WriteResponse response) {
+                if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+                    latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    bytes.add(size);
+                    writeBytes.add(size);
+                } else {
+                    latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                }
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+                latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+            }
+        });
+    }
+
+    @Override
+    public long getPayloadSize() {
+      return payload.length;
+    }
+
+    @Override
+    public Long computeChecksum() {
+        return ProtocolUtils.writeOpCRC32(stream, payload);
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowWrite(stream)) {
+            deniedWriteCounter.inc();
+            throw new RequestDeniedException(stream, "write");
+        }
+        super.preExecute();
+    }
+
+    @Override
+    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+                                              Sequencer sequencer,
+                                              Object txnLock) {
+        if (!stream.equals(writer.getStreamName())) {
+            logger.error("Write: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
+            return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
+        }
+
+        long txnId;
+        Future<DLSN> writeResult;
+        synchronized (txnLock) {
+            txnId = sequencer.nextId();
+            LogRecord record = new LogRecord(txnId, payload);
+            if (isRecordSet) {
+                record.setRecordSet();
+            }
+            writeResult = writer.write(record);
+        }
+        return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
+            @Override
+            public WriteResponse apply(DLSN value) {
+                successRecordCounter.inc();
+                return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
+            }
+        });
+    }
+
+    @Override
+    protected void fail(ResponseHeader header) {
+        if (StatusCode.FOUND == header.getCode()) {
+            redirectRecordCounter.inc();
+        } else {
+            failureRecordCounter.inc();
+        }
+        super.fail(header);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
new file mode 100644
index 0000000..e411b420
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+/**
+ * A write operation with payload.
+ */
+public interface WriteOpWithPayload {
+
+    // Return the payload size in bytes
+    long getPayloadSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
new file mode 100644
index 0000000..fcaee35
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import org.apache.distributedlog.exceptions.DLException;
+import com.twitter.util.Future;
+
+/**
+ * Admin operation interface.
+ */
+public interface AdminOp<RespT> {
+
+    /**
+     * Invoked before the stream op is executed.
+     */
+    void preExecute() throws DLException;
+
+    /**
+     * Execute the operation.
+     *
+     * @return the future represents the response of the operation
+     */
+    Future<RespT> execute();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
new file mode 100644
index 0000000..89a2566
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import static org.apache.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to create log stream.
+ */
+public class CreateOp extends StreamAdminOp {
+
+  public CreateOp(String stream,
+                  StatsLogger statsLogger,
+                  StreamManager streamManager,
+                  Long checksum,
+                  Feature checksumEnabledFeature) {
+    super(stream,
+            streamManager,
+            requestStat(statsLogger, "create"),
+            checksum,
+            checksumEnabledFeature);
+  }
+
+  @Override
+  protected Future<WriteResponse> executeOp() {
+    Future<Void> result = streamManager.createStreamAsync(stream);
+    return result.map(new AbstractFunction1<Void, WriteResponse>() {
+      @Override
+      public WriteResponse apply(Void value) {
+        return ResponseUtils.writeSuccess();
+      }
+    });
+  }
+}