You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:21 UTC
[14/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
new file mode 100644
index 0000000..c0c0972
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -0,0 +1,926 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OverCapacityException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.StatusCode;
+import org.apache.distributedlog.exceptions.StreamNotReadyException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.io.Abortables;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.FatalErrorHandler;
+import org.apache.distributedlog.service.ServerFeatureKeys;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.TimeSequencer;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.TimeoutException;
+import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of {@link Stream}.
+ */
+public class StreamImpl implements Stream {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+
+ /**
+ * The status of the stream.
+ *
+ * <p>The status change of the stream should just go in one direction. If a stream hits
+ * any error, the stream should be put in error state. If a stream is in error state,
+ * it should be removed and not reused anymore.
+ */
+ public enum StreamStatus {
+ UNINITIALIZED(-1),
+ INITIALIZING(0),
+ INITIALIZED(1),
+ CLOSING(-4),
+ CLOSED(-5),
+ // if a stream is in error state, it should be abort during closing.
+ ERROR(-6);
+
+ final int code;
+
+ StreamStatus(int code) {
+ this.code = code;
+ }
+
+ int getCode() {
+ return code;
+ }
+
+ public static boolean isUnavailable(StreamStatus status) {
+ return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED == status;
+ }
+ }
+
+ private final String name;
+ private final Partition partition;
+ private DistributedLogManager manager;
+
+ private volatile AsyncLogWriter writer;
+ private volatile StreamStatus status;
+ private volatile String owner;
+ private volatile Throwable lastException;
+ private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
+
+ private final Promise<Void> closePromise = new Promise<Void>();
+ private final Object txnLock = new Object();
+ private final TimeSequencer sequencer = new TimeSequencer();
+ private final StreamRequestLimiter limiter;
+ private final DynamicDistributedLogConfiguration dynConf;
+ private final DistributedLogConfiguration dlConfig;
+ private final DistributedLogNamespace dlNamespace;
+ private final String clientId;
+ private final OrderedScheduler scheduler;
+ private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
+ private final Feature featureRateLimitDisabled;
+ private final StreamManager streamManager;
+ private final StreamConfigProvider streamConfigProvider;
+ private final FatalErrorHandler fatalErrorHandler;
+ private final long streamProbationTimeoutMs;
+ private final long serviceTimeoutMs;
+ private final long writerCloseTimeoutMs;
+ private final boolean failFastOnStreamNotReady;
+ private final HashedWheelTimer requestTimer;
+ private final Timer futureTimer;
+
+ // Stats
+ private final StatsLogger streamLogger;
+ private final StatsLogger streamExceptionStatLogger;
+ private final StatsLogger limiterStatLogger;
+ private final Counter serviceTimeout;
+ private final OpStatsLogger streamAcquireStat;
+ private final OpStatsLogger writerCloseStatLogger;
+ private final Counter pendingOpsCounter;
+ private final Counter unexpectedExceptions;
+ private final Counter writerCloseTimeoutCounter;
+ private final StatsLogger exceptionStatLogger;
+ private final ConcurrentHashMap<String, Counter> exceptionCounters =
+ new ConcurrentHashMap<String, Counter>();
+ private final Gauge<Number> streamStatusGauge;
+
+ // Since we may create and discard streams at initialization if there's a race,
+ // must not do any expensive initialization here (particularly any locking or
+ // significant resource allocation etc.).
+ StreamImpl(final String name,
+ final Partition partition,
+ String clientId,
+ StreamManager streamManager,
+ StreamOpStats streamOpStats,
+ ServerConfiguration serverConfig,
+ DistributedLogConfiguration dlConfig,
+ DynamicDistributedLogConfiguration streamConf,
+ FeatureProvider featureProvider,
+ StreamConfigProvider streamConfigProvider,
+ DistributedLogNamespace dlNamespace,
+ OrderedScheduler scheduler,
+ FatalErrorHandler fatalErrorHandler,
+ HashedWheelTimer requestTimer,
+ Timer futureTimer) {
+ this.clientId = clientId;
+ this.dlConfig = dlConfig;
+ this.streamManager = streamManager;
+ this.name = name;
+ this.partition = partition;
+ this.status = StreamStatus.UNINITIALIZED;
+ this.lastException = new IOException("Fail to write record to stream " + name);
+ this.streamConfigProvider = streamConfigProvider;
+ this.dlNamespace = dlNamespace;
+ this.featureRateLimitDisabled = featureProvider.getFeature(
+ ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase());
+ this.scheduler = scheduler;
+ this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs();
+ this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs();
+ this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs();
+ this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady();
+ this.fatalErrorHandler = fatalErrorHandler;
+ this.dynConf = streamConf;
+ StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
+ streamOpStats.baseScope("stream_limiter"),
+ streamOpStats.streamRequestScope(partition, "limiter"));
+ this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
+ this.requestTimer = requestTimer;
+ this.futureTimer = futureTimer;
+
+ // Stats
+ this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
+ this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
+ this.streamExceptionStatLogger = streamLogger.scope("exceptions");
+ this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
+ StatsLogger streamsStatsLogger = streamOpStats.baseScope("streams");
+ this.streamAcquireStat = streamsStatsLogger.getOpStatsLogger("acquire");
+ this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
+ this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions");
+ this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
+ this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close");
+ this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts");
+ // Gauges
+ this.streamStatusGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return StreamStatus.UNINITIALIZED.getCode();
+ }
+ @Override
+ public Number getSample() {
+ return status.getCode();
+ }
+ };
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public String getStreamName() {
+ return name;
+ }
+
+ @Override
+ public DynamicDistributedLogConfiguration getStreamConfiguration() {
+ return dynConf;
+ }
+
+ @Override
+ public Partition getPartition() {
+ return partition;
+ }
+
+ private DistributedLogManager openLog(String name) throws IOException {
+ Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent();
+ Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf);
+ Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
+ return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger);
+ }
+
+ // Expensive initialization, only called once per stream.
+ @Override
+ public void initialize() throws IOException {
+ manager = openLog(name);
+
+ // Better to avoid registering the gauge multiple times, so do this in init
+ // which only gets called once.
+ streamLogger.registerGauge("stream_status", this.streamStatusGauge);
+
+ // Signal initialization is complete, should be last in this method.
+ status = StreamStatus.INITIALIZING;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
+ }
+
+ @Override
+ public void start() {
+ // acquire the stream
+ acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+ @Override
+ public void onSuccess(Boolean success) {
+ if (!success) {
+ // failed to acquire the stream. set the stream in error status and close it.
+ setStreamInErrorStatus();
+ requestClose("Failed to acquire the ownership");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ // unhandled exceptions
+ logger.error("Stream {} threw unhandled exception : ", name, cause);
+ // failed to acquire the stream. set the stream in error status and close it.
+ setStreamInErrorStatus();
+ requestClose("Unhandled exception");
+ }
+ });
+ }
+
+ //
+ // Stats Operations
+ //
+
+ void countException(Throwable t, StatsLogger streamExceptionLogger) {
+ String exceptionName = null == t ? "null" : t.getClass().getName();
+ Counter counter = exceptionCounters.get(exceptionName);
+ if (null == counter) {
+ counter = exceptionStatLogger.getCounter(exceptionName);
+ Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+ if (null != oldCounter) {
+ counter = oldCounter;
+ }
+ }
+ counter.inc();
+ streamExceptionLogger.getCounter(exceptionName).inc();
+ }
+
+ boolean isCriticalException(Throwable cause) {
+ return !(cause instanceof OwnershipAcquireFailedException);
+ }
+
+ //
+ // Service Timeout:
+ // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+ // - if the operation is completed within timeout period, cancel the timeout.
+ //
+
+ void scheduleTimeout(final StreamOp op) {
+ final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (!timeout.isCancelled()) {
+ serviceTimeout.inc();
+ handleServiceTimeout("Operation " + op.getClass().getName() + " timeout");
+ }
+ }
+ }, serviceTimeoutMs, TimeUnit.MILLISECONDS);
+ op.responseHeader().ensure(new Function0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ timeout.cancel();
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Close the stream and schedule cache eviction at some point in the future.
+ * We delay this as a way to place the stream in a probationary state--cached
+ * in the proxy but unusable.
+ * This mechanism helps the cluster adapt to situations where a proxy has
+ * persistent connectivity/availability issues, because it keeps an affected
+ * stream off the proxy for a period of time, hopefully long enough for the
+ * issues to be resolved, or for whoop to kick in and kill the shard.
+ */
+ void handleServiceTimeout(String reason) {
+ synchronized (this) {
+ if (StreamStatus.isUnavailable(status)) {
+ return;
+ }
+ // Mark stream in error state
+ setStreamInErrorStatus();
+ }
+
+ // Async close request, and schedule eviction when its done.
+ Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
+ closeFuture.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Void result) {
+ streamManager.scheduleRemoval(StreamImpl.this, streamProbationTimeoutMs);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ //
+ // Submit the operation to the stream.
+ //
+
+ /**
+ * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
+ * execution once complete.
+ *
+ * @param op
+ * stream operation to execute.
+ */
+ @Override
+ public void submit(StreamOp op) {
+ try {
+ limiter.apply(op);
+ } catch (OverCapacityException ex) {
+ op.fail(ex);
+ return;
+ }
+
+ // Timeout stream op if requested.
+ if (serviceTimeoutMs > 0) {
+ scheduleTimeout(op);
+ }
+
+ boolean completeOpNow = false;
+ boolean success = true;
+ if (StreamStatus.isUnavailable(status)) {
+ // Stream is closed, fail the op immediately
+ op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+ return;
+ } else if (StreamStatus.INITIALIZED == status && writer != null) {
+ completeOpNow = true;
+ success = true;
+ } else {
+ synchronized (this) {
+ if (StreamStatus.isUnavailable(status)) {
+ // Stream is closed, fail the op immediately
+ op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+ return;
+ } else if (StreamStatus.INITIALIZED == status) {
+ completeOpNow = true;
+ success = true;
+ } else if (failFastOnStreamNotReady) {
+ op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
+ return;
+ } else { // the stream is still initializing
+ pendingOps.add(op);
+ pendingOpsCounter.inc();
+ if (1 == pendingOps.size()) {
+ if (op instanceof HeartbeatOp) {
+ ((HeartbeatOp) op).setWriteControlRecord(true);
+ }
+ }
+ }
+ }
+ }
+ if (completeOpNow) {
+ executeOp(op, success);
+ }
+ }
+
+ //
+ // Execute operations and handle exceptions on operations
+ //
+
+ /**
+ * Execute the <i>op</i> immediately.
+ *
+ * @param op
+ * stream operation to execute.
+ * @param success
+ * whether the operation is success or not.
+ */
+ void executeOp(final StreamOp op, boolean success) {
+ final AsyncLogWriter writer;
+ final Throwable lastException;
+ synchronized (this) {
+ writer = this.writer;
+ lastException = this.lastException;
+ }
+ if (null != writer && success) {
+ op.execute(writer, sequencer, txnLock)
+ .addEventListener(new FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ // nop
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ boolean countAsException = true;
+ if (cause instanceof DLException) {
+ final DLException dle = (DLException) cause;
+ switch (dle.getCode()) {
+ case StatusCode.FOUND:
+ assert(cause instanceof OwnershipAcquireFailedException);
+ countAsException = false;
+ handleExceptionOnStreamOp(op, cause);
+ break;
+ case StatusCode.ALREADY_CLOSED:
+ assert(cause instanceof AlreadyClosedException);
+ op.fail(cause);
+ handleAlreadyClosedException((AlreadyClosedException) cause);
+ break;
+ // exceptions that mostly from client (e.g. too large record)
+ case StatusCode.NOT_IMPLEMENTED:
+ case StatusCode.METADATA_EXCEPTION:
+ case StatusCode.LOG_EMPTY:
+ case StatusCode.LOG_NOT_FOUND:
+ case StatusCode.TRUNCATED_TRANSACTION:
+ case StatusCode.END_OF_STREAM:
+ case StatusCode.TRANSACTION_OUT_OF_ORDER:
+ case StatusCode.INVALID_STREAM_NAME:
+ case StatusCode.TOO_LARGE_RECORD:
+ case StatusCode.STREAM_NOT_READY:
+ case StatusCode.OVER_CAPACITY:
+ op.fail(cause);
+ break;
+ // the DL writer hits exception, simple set the stream to error status
+ // and fail the request
+ default:
+ handleExceptionOnStreamOp(op, cause);
+ break;
+ }
+ } else {
+ handleExceptionOnStreamOp(op, cause);
+ }
+ if (countAsException) {
+ countException(cause, streamExceptionStatLogger);
+ }
+ }
+ });
+ } else {
+ if (null != lastException) {
+ op.fail(lastException);
+ } else {
+ op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+ }
+ }
+ }
+
+ /**
+ * Handle exception when executing <i>op</i>.
+ *
+ * @param op
+ * stream operation executing
+ * @param cause
+ * exception received when executing <i>op</i>
+ */
+ private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
+ AsyncLogWriter oldWriter = null;
+ boolean statusChanged = false;
+ synchronized (this) {
+ if (StreamStatus.INITIALIZED == status) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
+ statusChanged = true;
+ }
+ }
+ if (statusChanged) {
+ Abortables.asyncAbort(oldWriter, false);
+ if (isCriticalException(cause)) {
+ logger.error("Failed to write data into stream {} : ", name, cause);
+ } else {
+ logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
+ }
+ requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
+ }
+ op.fail(cause);
+ }
+
+ /**
+ * Handling already closed exception.
+ */
+ private void handleAlreadyClosedException(AlreadyClosedException ace) {
+ unexpectedExceptions.inc();
+ logger.error("Encountered unexpected exception when writing data into stream {} : ", name, ace);
+ fatalErrorHandler.notifyFatalError();
+ }
+
+ //
+ // Acquire streams
+ //
+
+ Future<Boolean> acquireStream() {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final Promise<Boolean> acquirePromise = new Promise<Boolean>();
+ manager.openAsyncLogWriter().addEventListener(
+ FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+
+ @Override
+ public void onSuccess(AsyncLogWriter w) {
+ onAcquireStreamSuccess(w, stopwatch, acquirePromise);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ onAcquireStreamFailure(cause, stopwatch, acquirePromise);
+ }
+
+ }, scheduler, getStreamName()));
+ return acquirePromise;
+ }
+
+ private void onAcquireStreamSuccess(AsyncLogWriter w,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ synchronized (txnLock) {
+ sequencer.setLastId(w.getLastTxId());
+ }
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+ StreamStatus.INITIALIZING, w, null);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = true;
+ }
+ // check if the stream is allowed to be acquired
+ if (!streamManager.allowAcquire(StreamImpl.this)) {
+ if (null != oldWriter) {
+ Abortables.asyncAbort(oldWriter, true);
+ }
+ int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+ StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+ + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+ countException(sue, exceptionStatLogger);
+ logger.error("Failed to acquire stream {} because it is unavailable : {}",
+ name, sue.getMessage());
+ synchronized (this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZED, null, sue);
+ // we don't switch the pending ops since they are already switched
+ // when setting the status to initialized
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+ }
+
+ private void onAcquireStreamFailure(Throwable cause,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ if (cause instanceof AlreadyClosedException) {
+ countException(cause, streamExceptionStatLogger);
+ handleAlreadyClosedException((AlreadyClosedException) cause);
+ return;
+ } else {
+ if (isCriticalException(cause)) {
+ countException(cause, streamExceptionStatLogger);
+ logger.error("Failed to acquire stream {} : ", name, cause);
+ } else {
+ logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+ }
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZING, null, cause);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+ }
+
+ /**
+ * Process the pending request after acquired stream.
+ *
+ * @param success whether the acquisition succeed or not
+ * @param oldWriter the old writer to abort
+ * @param oldPendingOps the old pending ops to execute
+ * @param stopwatch stopwatch to measure the time spent on acquisition
+ * @param acquirePromise the promise to complete the acquire operation
+ */
+ void processPendingRequestsAfterAcquire(boolean success,
+ AsyncLogWriter oldWriter,
+ Queue<StreamOp> oldPendingOps,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ if (success) {
+ streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ } else {
+ streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ }
+ for (StreamOp op : oldPendingOps) {
+ executeOp(op, success);
+ pendingOpsCounter.dec();
+ }
+ Abortables.asyncAbort(oldWriter, true);
+ FutureUtils.setValue(acquirePromise, success);
+ }
+
+ //
+ // Stream Status Changes
+ //
+
+ synchronized void setStreamInErrorStatus() {
+ if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
+ return;
+ }
+ this.status = StreamStatus.ERROR;
+ }
+
+ /**
+ * Update the stream status. The changes are only applied when there isn't status changed.
+ *
+ * @param newStatus
+ * new status
+ * @param oldStatus
+ * old status
+ * @param writer
+ * new log writer
+ * @param t
+ * new exception
+ * @return old writer if it exists
+ */
+ synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
+ StreamStatus oldStatus,
+ AsyncLogWriter writer,
+ Throwable t) {
+ if (oldStatus != this.status) {
+ logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
+ new Object[] { name, oldStatus, this.status, newStatus });
+ return null;
+ }
+
+ String owner = null;
+ if (t instanceof OwnershipAcquireFailedException) {
+ owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+ }
+
+ AsyncLogWriter oldWriter = this.writer;
+ this.writer = writer;
+ if (null != owner && owner.equals(clientId)) {
+ unexpectedExceptions.inc();
+ logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :",
+ new Object[] { owner, name, t });
+ // I lost the ownership but left a lock over zookeeper
+ // I should not ask client to redirect to myself again as I can't handle it :(
+ // shutdown myself
+ fatalErrorHandler.notifyFatalError();
+ this.owner = null;
+ } else {
+ this.owner = owner;
+ }
+ this.lastException = t;
+ this.status = newStatus;
+ if (StreamStatus.INITIALIZED == newStatus) {
+ streamManager.notifyAcquired(this);
+ logger.info("Inserted acquired stream {} -> writer {}", name, this);
+ } else {
+ streamManager.notifyReleased(this);
+ logger.info("Removed acquired stream {} -> writer {}", name, this);
+ }
+ return oldWriter;
+ }
+
+ //
+ // Stream Close Functions
+ //
+
+ void close(DistributedLogManager dlm) {
+ if (null != dlm) {
+ try {
+ dlm.close();
+ } catch (IOException ioe) {
+ logger.warn("Failed to close dlm for {} : ", name, ioe);
+ }
+ }
+ }
+
+ @Override
+ public Future<Void> requestClose(String reason) {
+ return requestClose(reason, true);
+ }
+
+ Future<Void> requestClose(String reason, boolean uncache) {
+ final boolean abort;
+ closeLock.writeLock().lock();
+ try {
+ if (StreamStatus.CLOSING == status
+ || StreamStatus.CLOSED == status) {
+ return closePromise;
+ }
+ logger.info("Request to close stream {} : {}", getStreamName(), reason);
+ // if the stream isn't closed from INITIALIZED state, we abort the stream instead of closing it.
+ abort = StreamStatus.INITIALIZED != status;
+ status = StreamStatus.CLOSING;
+ streamManager.notifyReleased(this);
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ // we will fail the requests that are coming in between closing and closed only
+ // after the async writer is closed. so we could clear up the lock before redirect
+ // them.
+ close(abort, uncache);
+ return closePromise;
+ }
+
+ @Override
+ public void delete() throws IOException {
+ if (null != writer) {
+ Utils.close(writer);
+ synchronized (this) {
+ writer = null;
+ lastException = new StreamUnavailableException("Stream was deleted");
+ }
+ }
+ if (null == manager) {
+ throw new UnexpectedException("No stream " + name + " to delete");
+ }
+ manager.delete();
+ }
+
+ /**
+ * Post action executed after closing.
+ */
+ private void postClose(boolean uncache) {
+ closeManagerAndErrorOutPendingRequests();
+ unregisterGauge();
+ if (uncache) {
+ if (null != owner) {
+ long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+ streamManager.scheduleRemoval(this, probationTimeoutMs);
+ } else {
+ streamManager.notifyRemoved(this);
+ logger.info("Removed cached stream {}.", getStreamName());
+ }
+ }
+ FutureUtils.setValue(closePromise, null);
+ }
+
+ /**
+ * Shouldn't call close directly. The callers should call #requestClose instead
+ *
+ * @param shouldAbort shall we abort the stream instead of closing
+ */
+ private Future<Void> close(boolean shouldAbort, final boolean uncache) {
+ boolean abort;
+ closeLock.writeLock().lock();
+ try {
+ if (StreamStatus.CLOSED == status) {
+ return closePromise;
+ }
+ abort = shouldAbort || (StreamStatus.INITIALIZED != status && StreamStatus.CLOSING != status);
+ status = StreamStatus.CLOSED;
+ streamManager.notifyReleased(this);
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ logger.info("Closing stream {} ...", name);
+ // Close the writers to release the locks before failing the requests
+ Future<Void> closeWriterFuture;
+ if (abort) {
+ closeWriterFuture = Abortables.asyncAbort(writer, true);
+ } else {
+ closeWriterFuture = Utils.asyncClose(writer, true);
+ }
+ // close the manager and error out pending requests after close writer
+ Duration closeWaitDuration;
+ if (writerCloseTimeoutMs <= 0) {
+ closeWaitDuration = Duration.Top();
+ } else {
+ closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
+ }
+
+ FutureUtils.stats(
+ closeWriterFuture,
+ writerCloseStatLogger,
+ Stopwatch.createStarted()
+ ).masked().within(futureTimer, closeWaitDuration)
+ .addEventListener(FutureUtils.OrderedFutureEventListener.of(
+ new FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ postClose(uncache);
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof TimeoutException) {
+ writerCloseTimeoutCounter.inc();
+ }
+ postClose(uncache);
+ }
+ }, scheduler, name));
+ return closePromise;
+ }
+
+ private void closeManagerAndErrorOutPendingRequests() {
+ close(manager);
+ // Failed the pending requests.
+ Queue<StreamOp> oldPendingOps;
+ synchronized (this) {
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ }
+ StreamUnavailableException closingException =
+ new StreamUnavailableException("Stream " + name + " is closed.");
+ for (StreamOp op : oldPendingOps) {
+ op.fail(closingException);
+ pendingOpsCounter.dec();
+ }
+ limiter.close();
+ logger.info("Closed stream {}.", name);
+ }
+
+ /**
+ * clean up the gauge to help GC.
+ */
+ private void unregisterGauge(){
+ streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
+ }
+
+ // Test-only apis
+
+ @VisibleForTesting
+ public int numPendingOps() {
+ Queue<StreamOp> queue = pendingOps;
+ return null == queue ? 0 : queue.size();
+ }
+
+ @VisibleForTesting
+ public StreamStatus getStatus() {
+ return status;
+ }
+
+ @VisibleForTesting
+ public void setStatus(StreamStatus status) {
+ this.status = status;
+ }
+
+ @VisibleForTesting
+ public AsyncLogWriter getWriter() {
+ return writer;
+ }
+
+ @VisibleForTesting
+ public DistributedLogManager getManager() {
+ return manager;
+ }
+
+ @VisibleForTesting
+ public Throwable getLastException() {
+ return lastException;
+ }
+
+ @VisibleForTesting
+ public Future<Void> getCloseFuture() {
+ return closePromise;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
new file mode 100644
index 0000000..d86c538
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.base.Optional;
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manage lifecycle of streams.
+ *
+ * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
+ *
+ * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
+ * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter,
+ * managing stream lock, interpreting exceptions, error conditions, and etc.
+ */
+public interface StreamManager {
+
+ /**
+ * Get a cached stream, returning null if it doesnt exist.
+ * @param stream name
+ * @return the cached stream
+ */
+ Stream getStream(String stream);
+
+ /**
+ * Get a cached stream and create a new one if it doesnt exist.
+ * @param streamName stream name
+ * @param start whether to start the stream after it is created.
+ * @return future satisfied once close complete
+ */
+ Stream getOrCreateStream(String streamName, boolean start) throws IOException;
+
+ /**
+ * Asynchronously create a new stream.
+ * @param stream
+ * @return Future satisfied once the stream is created
+ */
+ Future<Void> createStreamAsync(String stream);
+
+ /**
+ * Is acquiring stream allowed?
+ *
+ * @param stream
+ * stream instance
+ * @return true if it is allowed to acquire this stream, otherwise false.
+ */
+ boolean allowAcquire(Stream stream);
+
+ /**
+ * Notify the manager that a stream was acquired.
+ * @param stream being acquired
+ */
+ void notifyAcquired(Stream stream);
+
+ /**
+ * Notify the manager that a stream was released.
+ * @param stream being released
+ */
+ void notifyReleased(Stream stream);
+
+ /**
+ * Notify the manager that a stream was completely removed.
+ * @param stream being uncached
+ * @return whether the stream existed or not
+ */
+ boolean notifyRemoved(Stream stream);
+
+ /**
+ * Asynchronous delete method.
+ * @param streamName stream name
+ * @return future satisfied once delete complete
+ */
+ Future<Void> deleteAndRemoveAsync(String streamName);
+
+ /**
+ * Asynchronous close and uncache method.
+ * @param streamName stream name
+ * @return future satisfied once close and uncache complete
+ */
+ Future<Void> closeAndRemoveAsync(String streamName);
+
+ /**
+ * Close and uncache after delayMs.
+ * @param stream to remove
+ */
+ void scheduleRemoval(Stream stream, long delayMs);
+
+ /**
+ * Close all stream.
+ * @return future satisfied all streams closed
+ */
+ Future<List<Void>> closeStreams();
+
+ /**
+ * Return map with stream ownership info.
+ * @param regex for filtering streams
+ * @return map containing ownership info
+ */
+ Map<String, String> getStreamOwnershipMap(Optional<String> regex);
+
+ /**
+ * Number of acquired streams.
+ * @return number of acquired streams
+ */
+ int numAcquired();
+
+ /**
+ * Number of cached streams.
+ * @return number of cached streams
+ */
+ int numCached();
+
+ /**
+ * Is the stream denoted by streamName in the acquired state.
+ * @return true if the stream is in the acquired state
+ */
+ boolean isAcquired(String streamName);
+
+ /**
+ * Close manager and disallow further activity.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
new file mode 100644
index 0000000..5d54738
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.PartitionMap;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.util.ConfUtils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track
+ * of Streams.
+ *
+ * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
+ * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply
+ * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl
+ * constructor.
+ */
+public class StreamManagerImpl implements StreamManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
+
+ private final ConcurrentHashMap<String, Stream> streams =
+ new ConcurrentHashMap<String, Stream>();
+ private final AtomicInteger numCached = new AtomicInteger(0);
+
+ private final ConcurrentHashMap<String, Stream> acquiredStreams =
+ new ConcurrentHashMap<String, Stream>();
+ private final AtomicInteger numAcquired = new AtomicInteger(0);
+
+ //
+ // Partitions
+ //
+ private final StreamPartitionConverter partitionConverter;
+ private final PartitionMap cachedPartitions = new PartitionMap();
+ private final PartitionMap acquiredPartitions = new PartitionMap();
+
+ final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
+ private final ScheduledExecutorService executorService;
+ private final DistributedLogConfiguration dlConfig;
+ private final StreamConfigProvider streamConfigProvider;
+ private final String clientId;
+ private boolean closed = false;
+ private final StreamFactory streamFactory;
+ private final DistributedLogNamespace dlNamespace;
+
+ public StreamManagerImpl(String clientId,
+ DistributedLogConfiguration dlConfig,
+ ScheduledExecutorService executorService,
+ StreamFactory streamFactory,
+ StreamPartitionConverter partitionConverter,
+ StreamConfigProvider streamConfigProvider,
+ DistributedLogNamespace dlNamespace) {
+ this.clientId = clientId;
+ this.executorService = executorService;
+ this.streamFactory = streamFactory;
+ this.partitionConverter = partitionConverter;
+ this.dlConfig = dlConfig;
+ this.streamConfigProvider = streamConfigProvider;
+ this.dlNamespace = dlNamespace;
+ }
+
+ private DynamicDistributedLogConfiguration getDynConf(String streamName) {
+ Optional<DynamicDistributedLogConfiguration> dynDlConf =
+ streamConfigProvider.getDynamicStreamConfig(streamName);
+ if (dynDlConf.isPresent()) {
+ return dynDlConf.get();
+ } else {
+ return ConfUtils.getConstDynConf(dlConfig);
+ }
+ }
+
+ @Override
+ public boolean allowAcquire(Stream stream) {
+ return acquiredPartitions.addPartition(
+ stream.getPartition(),
+ stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy());
+ }
+
+ /**
+ * Must be enqueued to an executor to avoid deadlocks (close and execute-op both
+ * try to acquire the same read-write lock).
+ */
+ @Override
+ public Future<Void> deleteAndRemoveAsync(final String stream) {
+ final Promise<Void> result = new Promise<Void>();
+ java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+ @Override
+ public void run() {
+ result.become(doDeleteAndRemoveAsync(stream));
+ }
+ }, 0);
+ if (null == scheduleFuture) {
+ return Future.exception(
+ new ServiceUnavailableException("Couldn't schedule a delete task."));
+ }
+ return result;
+ }
+
+ /**
+ * Must be enqueued to an executor to avoid deadlocks (close and execute-op both
+ * try to acquire the same read-write lock).
+ */
+ @Override
+ public Future<Void> closeAndRemoveAsync(final String streamName) {
+ final Promise<Void> releasePromise = new Promise<Void>();
+ java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+ @Override
+ public void run() {
+ releasePromise.become(doCloseAndRemoveAsync(streamName));
+ }
+ }, 0);
+ if (null == scheduleFuture) {
+ return Future.exception(
+ new ServiceUnavailableException("Couldn't schedule a release task."));
+ }
+ return releasePromise;
+ }
+
+ @Override
+ public Future<Void> createStreamAsync(final String stream) {
+ final Promise<Void> createPromise = new Promise<Void>();
+ java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ dlNamespace.createLog(stream);
+ createPromise.setValue(null);
+ } catch (Exception e) {
+ createPromise.setException(e);
+ }
+ }
+ }, 0);
+ if (null == scheduleFuture) {
+ return Future.exception(
+ new ServiceUnavailableException("Couldn't schedule a create task."));
+ }
+ return createPromise;
+ }
+
+ @Override
+ public void notifyReleased(Stream stream) {
+ acquiredPartitions.removePartition(stream.getPartition());
+ if (acquiredStreams.remove(stream.getStreamName(), stream)) {
+ numAcquired.getAndDecrement();
+ }
+ }
+
+ @Override
+ public void notifyAcquired(Stream stream) {
+ if (null == acquiredStreams.put(stream.getStreamName(), stream)) {
+ numAcquired.getAndIncrement();
+ }
+ }
+
+ @Override
+ public boolean notifyRemoved(Stream stream) {
+ cachedPartitions.removePartition(stream.getPartition());
+ if (streams.remove(stream.getStreamName(), stream)) {
+ numCached.getAndDecrement();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Map<String, String> getStreamOwnershipMap(Optional<String> regex) {
+ Map<String, String> ownershipMap = new HashMap<String, String>();
+ for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) {
+ String name = entry.getKey();
+ if (regex.isPresent() && !name.matches(regex.get())) {
+ continue;
+ }
+ Stream stream = entry.getValue();
+ if (null == stream) {
+ continue;
+ }
+ String owner = stream.getOwner();
+ if (null == owner) {
+ ownershipMap.put(name, clientId);
+ }
+ }
+ return ownershipMap;
+ }
+
+ @Override
+ public Stream getStream(String stream) {
+ return streams.get(stream);
+ }
+
+ @Override
+ public Stream getOrCreateStream(String streamName, boolean start) throws IOException {
+ Stream stream = streams.get(streamName);
+ if (null == stream) {
+ closeLock.readLock().lock();
+ try {
+ if (closed) {
+ return null;
+ }
+ DynamicDistributedLogConfiguration dynConf = getDynConf(streamName);
+ int maxCachedPartitions = dynConf.getMaxCachedPartitionsPerProxy();
+
+ // get partition from the stream name
+ Partition partition = partitionConverter.convert(streamName);
+
+ // add partition to cached map
+ if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) {
+ throw new StreamUnavailableException("Stream " + streamName
+ + " is not allowed to cache more than " + maxCachedPartitions + " partitions");
+ }
+
+ stream = newStream(streamName, dynConf);
+ Stream oldWriter = streams.putIfAbsent(streamName, stream);
+ if (null != oldWriter) {
+ stream = oldWriter;
+ } else {
+ numCached.getAndIncrement();
+ logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream);
+ stream.initialize();
+ if (start) {
+ stream.start();
+ }
+ }
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+ return stream;
+ }
+
+ @Override
+ public Future<List<Void>> closeStreams() {
+ int numAcquired = acquiredStreams.size();
+ int numCached = streams.size();
+ logger.info("Closing all acquired streams : acquired = {}, cached = {}.",
+ numAcquired, numCached);
+ Set<Stream> streamsToClose = new HashSet<Stream>();
+ streamsToClose.addAll(streams.values());
+ return closeStreams(streamsToClose, Optional.<RateLimiter>absent());
+ }
+
+ @Override
+ public void scheduleRemoval(final Stream stream, long delayMs) {
+ if (delayMs > 0) {
+ logger.info("Scheduling removal of stream {} from cache after {} sec.",
+ stream.getStreamName(), delayMs);
+ }
+ schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (notifyRemoved(stream)) {
+ logger.info("Removed cached stream {} after probation.", stream.getStreamName());
+ } else {
+ logger.info("Cached stream {} already removed.", stream.getStreamName());
+ }
+ }
+ }, delayMs);
+ }
+
+ @Override
+ public int numAcquired() {
+ return numAcquired.get();
+ }
+
+ @Override
+ public int numCached() {
+ return numCached.get();
+ }
+
+ @Override
+ public boolean isAcquired(String streamName) {
+ return acquiredStreams.containsKey(streamName);
+ }
+
+ @Override
+ public void close() {
+ closeLock.writeLock().lock();
+ try {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ }
+
+ private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
+ if (streamsToClose.isEmpty()) {
+ logger.info("No streams to close.");
+ List<Void> emptyList = new ArrayList<Void>();
+ return Future.value(emptyList);
+ }
+ List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
+ for (Stream stream : streamsToClose) {
+ if (rateLimiter.isPresent()) {
+ rateLimiter.get().acquire();
+ }
+ futures.add(stream.requestClose("Close Streams"));
+ }
+ return Future.collect(futures);
+ }
+
+ private Stream newStream(String name, DynamicDistributedLogConfiguration streamConf) {
+ return streamFactory.create(name, streamConf, this);
+ }
+
+ public Future<Void> doCloseAndRemoveAsync(final String streamName) {
+ Stream stream = streams.get(streamName);
+ if (null == stream) {
+ logger.info("No stream {} to release.", streamName);
+ return Future.value(null);
+ } else {
+ return stream.requestClose("release ownership");
+ }
+ }
+
+ /**
+ * Dont schedule if we're closed - closeLock is acquired to close, so if we acquire the
+ * lock and discover we're not closed, we won't schedule.
+ */
+ private java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
+ closeLock.readLock().lock();
+ try {
+ if (closed) {
+ return null;
+ } else if (delayMs > 0) {
+ return executorService.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
+ } else {
+ return executorService.submit(runnable);
+ }
+ } catch (RejectedExecutionException ree) {
+ logger.error("Failed to schedule task {} in {} ms : ",
+ new Object[] { runnable, delayMs, ree });
+ return null;
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ private Future<Void> doDeleteAndRemoveAsync(final String streamName) {
+ Stream stream = streams.get(streamName);
+ if (null == stream) {
+ logger.warn("No stream {} to delete.", streamName);
+ return Future.exception(new UnexpectedException("No stream " + streamName + " to delete."));
+ } else {
+ Future<Void> result;
+ logger.info("Deleting stream {}, {}", streamName, stream);
+ try {
+ stream.delete();
+ result = stream.requestClose("Stream Deleted");
+ } catch (IOException e) {
+ logger.error("Failed on removing stream {} : ", streamName, e);
+ result = Future.exception(e);
+ }
+ return result;
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<String, Stream> getCachedStreams() {
+ return streams;
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<String, Stream> getAcquiredStreams() {
+ return acquiredStreams;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
new file mode 100644
index 0000000..d0b8de4
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+
+/**
+ * An operation applied to a stream.
+ */
+public interface StreamOp {
+ /**
+ * Execute a stream op with the supplied writer.
+ *
+ * @param writer active writer for applying the change
+ * @param sequencer sequencer used for generating transaction id for stream operations
+ * @param txnLock transaction lock to guarantee ordering of transaction id
+ * @return a future satisfied when the operation completes execution
+ */
+ Future<Void> execute(AsyncLogWriter writer,
+ Sequencer sequencer,
+ Object txnLock);
+
+ /**
+ * Invoked before the stream op is executed.
+ */
+ void preExecute() throws DLException;
+
+ /**
+ * Return the response header (containing the status code etc.).
+ *
+ * @return A future containing the response header or the exception
+ * encountered by the op if it failed.
+ */
+ Future<ResponseHeader> responseHeader();
+
+ /**
+ * Abort the operation with the givem exception.
+ */
+ void fail(Throwable t);
+
+ /**
+ * Return the stream name.
+ */
+ String streamName();
+
+ /**
+ * Stopwatch gives the start time of the operation.
+ */
+ Stopwatch stopwatch();
+
+ /**
+ * Compute checksum from arguments.
+ */
+ Long computeChecksum();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
new file mode 100644
index 0000000..f3fc610
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Encapsulate stream op stats construction to make it easier to access stream
+ * op stats consistently from different scopes.
+ */
+public class StreamOpStats {
+ private final StatsLogger baseStatsLogger;
+ private final StatsLogger requestStatsLogger;
+ private final StatsLogger recordsStatsLogger;
+ private final StatsLogger requestDeniedStatsLogger;
+ private final StatsLogger streamStatsLogger;
+
+ public StreamOpStats(StatsLogger statsLogger,
+ StatsLogger perStreamStatsLogger) {
+ this.baseStatsLogger = statsLogger;
+ this.requestStatsLogger = statsLogger.scope("request");
+ this.recordsStatsLogger = statsLogger.scope("records");
+ this.requestDeniedStatsLogger = statsLogger.scope("denied");
+ this.streamStatsLogger = perStreamStatsLogger;
+ }
+
+ public StatsLogger baseStatsLogger(String opName) {
+ return baseStatsLogger;
+ }
+
+ public Counter baseCounter(String opName) {
+ return baseStatsLogger.getCounter(opName);
+ }
+
+ public StatsLogger baseScope(String opName) {
+ return baseStatsLogger.scope(opName);
+ }
+
+ public OpStatsLogger requestLatencyStat(String opName) {
+ return requestStatsLogger.getOpStatsLogger(opName);
+ }
+
+ public StatsLogger requestScope(String scopeName) {
+ return requestStatsLogger.scope(scopeName);
+ }
+
+ public Counter scopedRequestCounter(String opName, String counterName) {
+ return requestScope(opName).getCounter(counterName);
+ }
+
+ public Counter requestCounter(String counterName) {
+ return requestStatsLogger.getCounter(counterName);
+ }
+
+ public Counter requestPendingCounter(String counterName) {
+ return requestCounter(counterName);
+ }
+
+ public Counter requestDeniedCounter(String counterName) {
+ return requestDeniedStatsLogger.getCounter(counterName);
+ }
+
+ public Counter recordsCounter(String counterName) {
+ return recordsStatsLogger.getCounter(counterName);
+ }
+
+ public StatsLogger streamRequestStatsLogger(Partition partition) {
+ return BroadCastStatsLogger.masterslave(
+ streamStatsLogger.scope(partition.getStream()).scope("partition")
+ .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
+ .scope("aggregate"));
+ }
+
+ public StatsLogger streamRequestScope(Partition partition, String scopeName) {
+ return streamRequestStatsLogger(partition).scope(scopeName);
+ }
+
+ public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
+ return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
+ }
+
+ public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
+ return streamRequestScope(partition, opName).getCounter(counterName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
new file mode 100644
index 0000000..0036a5c
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to truncate a log stream.
+ */
+public class TruncateOp extends AbstractWriteOp {
+
+ private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
+
+ private final Counter deniedTruncateCounter;
+ private final DLSN dlsn;
+ private final AccessControlManager accessControlManager;
+
+ public TruncateOp(String stream,
+ DLSN dlsn,
+ StatsLogger statsLogger,
+ StatsLogger perStreamStatsLogger,
+ Long checksum,
+ Feature checksumDisabledFeature,
+ AccessControlManager accessControlManager) {
+ super(stream, requestStat(statsLogger, "truncate"), checksum, checksumDisabledFeature);
+ StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+ this.deniedTruncateCounter = streamOpStats.requestDeniedCounter("truncate");
+ this.accessControlManager = accessControlManager;
+ this.dlsn = dlsn;
+ }
+
+ @Override
+ public Long computeChecksum() {
+ return ProtocolUtils.truncateOpCRC32(stream, dlsn);
+ }
+
+ @Override
+ protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+ Sequencer sequencer,
+ Object txnLock) {
+ if (!stream.equals(writer.getStreamName())) {
+ logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
+ return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
+ }
+ return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() {
+ @Override
+ public WriteResponse apply(Boolean v1) {
+ return ResponseUtils.writeSuccess();
+ }
+ });
+ }
+
+ @Override
+ public void preExecute() throws DLException {
+ if (!accessControlManager.allowTruncate(stream)) {
+ deniedTruncateCounter.inc();
+ throw new RequestDeniedException(stream, "truncate");
+ }
+ super.preExecute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
new file mode 100644
index 0000000..2e7ffb8
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to write a single record to a log stream.
+ */
+public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
+
+ private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
+
+ private final byte[] payload;
+ private final boolean isRecordSet;
+
+ // Stats
+ private final Counter deniedWriteCounter;
+ private final Counter successRecordCounter;
+ private final Counter failureRecordCounter;
+ private final Counter redirectRecordCounter;
+ private final OpStatsLogger latencyStat;
+ private final Counter bytes;
+ private final Counter writeBytes;
+
+ private final byte dlsnVersion;
+ private final AccessControlManager accessControlManager;
+
+ public WriteOp(String stream,
+ ByteBuffer data,
+ StatsLogger statsLogger,
+ StatsLogger perStreamStatsLogger,
+ StreamPartitionConverter streamPartitionConverter,
+ ServerConfiguration conf,
+ byte dlsnVersion,
+ Long checksum,
+ boolean isRecordSet,
+ Feature checksumDisabledFeature,
+ AccessControlManager accessControlManager) {
+ super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature);
+ payload = new byte[data.remaining()];
+ data.get(payload);
+ this.isRecordSet = isRecordSet;
+
+ final Partition partition = streamPartitionConverter.convert(stream);
+ StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+ this.successRecordCounter = streamOpStats.recordsCounter("success");
+ this.failureRecordCounter = streamOpStats.recordsCounter("failure");
+ this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
+ this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
+ this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
+ this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
+ this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");
+
+ this.dlsnVersion = dlsnVersion;
+ this.accessControlManager = accessControlManager;
+
+ final long size = getPayloadSize();
+ result().addEventListener(new FutureEventListener<WriteResponse>() {
+ @Override
+ public void onSuccess(WriteResponse response) {
+ if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+ latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ bytes.add(size);
+ writeBytes.add(size);
+ } else {
+ latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ }
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ }
+ });
+ }
+
+ @Override
+ public long getPayloadSize() {
+ return payload.length;
+ }
+
+ @Override
+ public Long computeChecksum() {
+ return ProtocolUtils.writeOpCRC32(stream, payload);
+ }
+
+ @Override
+ public void preExecute() throws DLException {
+ if (!accessControlManager.allowWrite(stream)) {
+ deniedWriteCounter.inc();
+ throw new RequestDeniedException(stream, "write");
+ }
+ super.preExecute();
+ }
+
+ @Override
+ protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+ Sequencer sequencer,
+ Object txnLock) {
+ if (!stream.equals(writer.getStreamName())) {
+ logger.error("Write: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
+ return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
+ }
+
+ long txnId;
+ Future<DLSN> writeResult;
+ synchronized (txnLock) {
+ txnId = sequencer.nextId();
+ LogRecord record = new LogRecord(txnId, payload);
+ if (isRecordSet) {
+ record.setRecordSet();
+ }
+ writeResult = writer.write(record);
+ }
+ return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
+ @Override
+ public WriteResponse apply(DLSN value) {
+ successRecordCounter.inc();
+ return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
+ }
+ });
+ }
+
+ @Override
+ protected void fail(ResponseHeader header) {
+ if (StatusCode.FOUND == header.getCode()) {
+ redirectRecordCounter.inc();
+ } else {
+ failureRecordCounter.inc();
+ }
+ super.fail(header);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
new file mode 100644
index 0000000..e411b420
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+/**
+ * A write operation with payload.
+ */
+public interface WriteOpWithPayload {
+
+ // Return the payload size in bytes
+ long getPayloadSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
new file mode 100644
index 0000000..fcaee35
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import org.apache.distributedlog.exceptions.DLException;
+import com.twitter.util.Future;
+
+/**
+ * Admin operation interface.
+ */
+public interface AdminOp<RespT> {
+
+ /**
+ * Invoked before the stream op is executed.
+ */
+ void preExecute() throws DLException;
+
+ /**
+ * Execute the operation.
+ *
+ * @return the future represents the response of the operation
+ */
+ Future<RespT> execute();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
new file mode 100644
index 0000000..89a2566
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream.admin;
+
+import static org.apache.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to create log stream.
+ */
+public class CreateOp extends StreamAdminOp {
+
+ public CreateOp(String stream,
+ StatsLogger statsLogger,
+ StreamManager streamManager,
+ Long checksum,
+ Feature checksumEnabledFeature) {
+ super(stream,
+ streamManager,
+ requestStat(statsLogger, "create"),
+ checksum,
+ checksumEnabledFeature);
+ }
+
+ @Override
+ protected Future<WriteResponse> executeOp() {
+ Future<Void> result = streamManager.createStreamAsync(stream);
+ return result.map(new AbstractFunction1<Void, WriteResponse>() {
+ @Override
+ public WriteResponse apply(Void value) {
+ return ResponseUtils.writeSuccess();
+ }
+ });
+ }
+}