You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:13 UTC

[06/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
deleted file mode 100644
index 862f05a..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.util.Utils;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
- * avoid necessitating an additional system for the resource placement.
- */
-public class ZKPlacementStateManager implements PlacementStateManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
-
-    private static final String SERVER_LOAD_DIR = "/.server-load";
-
-    private final String serverLoadPath;
-    private final ZooKeeperClient zkClient;
-
-    private boolean watching = false;
-
-    public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
-        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
-        zkClient = BKNamespaceDriver.createZKClientBuilder(
-            String.format("ZKPlacementStateManager-%s", zkServers),
-            conf,
-            zkServers,
-            statsLogger.scope("placement_state_manager")).build();
-        serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
-    }
-
-    private void createServerLoadPathIfNoExists(byte[] data)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
-        try {
-            Utils.zkCreateFullPathOptimistic(
-                zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
-        } catch (KeeperException.NodeExistsException nee) {
-            logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
-        }
-    }
-
-    @Override
-    public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
-        logger.info("saving ownership");
-        try {
-            ZooKeeper zk = zkClient.get();
-            // use timestamp as data so watchers will see any changes
-            byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
-            if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
-                createServerLoadPathIfNoExists(timestamp);
-            }
-
-            Transaction tx = zk.transaction();
-            List<String> children = zk.getChildren(serverLoadPath, false);
-            HashSet<String> servers = new HashSet<String>(children);
-            tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
-            for (ServerLoad serverLoad : serverLoads) {
-                String server = serverToZkFormat(serverLoad.getServer());
-                String serverPath = serverPath(server);
-                if (servers.contains(server)) {
-                    servers.remove(server);
-                    tx.setData(serverPath, serverLoad.serialize(), -1);
-                } else {
-                    tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
-                }
-            }
-            for (String server : servers) {
-                tx.delete(serverPath(server), -1);
-            }
-            tx.commit();
-        } catch (InterruptedException | IOException | KeeperException e) {
-            throw new StateManagerSaveException(e);
-        }
-    }
-
-    @Override
-    public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
-        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-        try {
-            ZooKeeper zk = zkClient.get();
-            List<String> children = zk.getChildren(serverLoadPath, false);
-            for (String server : children) {
-                ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
-            }
-            return ownerships;
-        } catch (InterruptedException | IOException | KeeperException e) {
-            throw new StateManagerLoadException(e);
-        }
-    }
-
-    @Override
-    public synchronized void watch(final PlacementCallback callback) {
-        if (watching) {
-            return; // do not double watch
-        }
-        watching = true;
-
-        try {
-            ZooKeeper zk = zkClient.get();
-            try {
-                zk.getData(serverLoadPath, new Watcher() {
-                    @Override
-                    public void process(WatchedEvent watchedEvent) {
-                        try {
-                            callback.callback(loadOwnership());
-                        } catch (StateManagerLoadException e) {
-                            logger.error("Watch of Ownership failed", e);
-                        } finally {
-                            watching = false;
-                            watch(callback);
-                        }
-                    }
-                }, new Stat());
-            } catch (KeeperException.NoNodeException nee) {
-                byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-                createServerLoadPathIfNoExists(timestamp);
-                watching = false;
-                watch(callback);
-            }
-        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
-            logger.error("Watch of Ownership failed", e);
-            watching = false;
-            watch(callback);
-        }
-    }
-
-    public String serverPath(String server) {
-        return String.format("%s/%s", serverLoadPath, server);
-    }
-
-    protected String serverToZkFormat(String server) {
-        return server.replaceAll("/", "--");
-    }
-
-    protected String zkFormatToServer(String zkFormattedServer) {
-        return zkFormattedServer.replaceAll("--", "/");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
deleted file mode 100644
index ea79251..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Placement Policy to place streams across proxy services.
- */
-package org.apache.distributedlog.service.placement;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
deleted file mode 100644
index 83ac668..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.exceptions.ChecksumFailedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Try;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Abstract Stream Operation.
- */
-public abstract class AbstractStreamOp<Response> implements StreamOp {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
-
-    protected final String stream;
-    protected final OpStatsLogger opStatsLogger;
-    private final Promise<Response> result = new Promise<Response>();
-    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
-    protected final Long checksum;
-    protected final Feature checksumDisabledFeature;
-
-    public AbstractStreamOp(String stream,
-                            OpStatsLogger statsLogger,
-                            Long checksum,
-                            Feature checksumDisabledFeature) {
-        this.stream = stream;
-        this.opStatsLogger = statsLogger;
-        // start here in case the operation is failed before executing.
-        stopwatch.reset().start();
-        this.checksum = checksum;
-        this.checksumDisabledFeature = checksumDisabledFeature;
-    }
-
-    @Override
-    public String streamName() {
-        return stream;
-    }
-
-    @Override
-    public Stopwatch stopwatch() {
-        return stopwatch;
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
-            Long serverChecksum = computeChecksum();
-            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
-                throw new ChecksumFailedException();
-            }
-        }
-    }
-
-    @Override
-    public Long computeChecksum() {
-        return null;
-    }
-
-    @Override
-    public Future<Void> execute(AsyncLogWriter writer, Sequencer sequencer, Object txnLock) {
-        stopwatch.reset().start();
-        return executeOp(writer, sequencer, txnLock)
-                .addEventListener(new FutureEventListener<Response>() {
-            @Override
-            public void onSuccess(Response response) {
-                opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                setResponse(response);
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-            }
-        }).voided();
-    }
-
-    /**
-     * Fail with current <i>owner</i> and its reason <i>t</i>.
-     *
-     * @param cause
-     *          failure reason
-     */
-    @Override
-    public void fail(Throwable cause) {
-        if (cause instanceof OwnershipAcquireFailedException) {
-            // Ownership exception is a control exception, not an error, so we don't stat
-            // it with the other errors.
-            OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
-            fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner()));
-        } else {
-            opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-            fail(ResponseUtils.exceptionToHeader(cause));
-        }
-    }
-
-    protected void setResponse(Response response) {
-      Return<Response> responseTry = new Return(response);
-      boolean isEmpty = result.updateIfEmpty(responseTry);
-      if (!isEmpty) {
-        Option<Try<Response>> resultTry = result.poll();
-        logger.error("Result set multiple times. Value='{}', New='{}'", resultTry, responseTry);
-      }
-    }
-
-    /**
-     * Return the full response, header and body.
-     *
-     * @return A future containing the response or the exception
-     *      encountered by the op if it failed.
-     */
-    public Future<Response> result() {
-        return result;
-    }
-
-    /**
-     * Execute the operation and return its corresponding response.
-     *
-     * @param writer
-     *          writer to execute the operation.
-     * @param sequencer
-     *          sequencer used for generating transaction id for stream operations
-     * @param txnLock
-     *          transaction lock to guarantee ordering of transaction id
-     * @return future representing the operation.
-     */
-    protected abstract Future<Response> executeOp(AsyncLogWriter writer,
-                                                  Sequencer sequencer,
-                                                  Object txnLock);
-
-    // fail the result with the given response header
-    protected abstract void fail(ResponseHeader header);
-
-    public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
-        return requestLogger(statsLogger).getOpStatsLogger(opName);
-    }
-
-    public static StatsLogger requestLogger(StatsLogger statsLogger) {
-        return statsLogger.scope("request");
-    }
-
-    public static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
-        return requestLogger(statsLogger).scope(scope);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
deleted file mode 100644
index 77c7d71..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Abstract Write Operation.
- */
-public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
-
-    protected AbstractWriteOp(String stream,
-                              OpStatsLogger statsLogger,
-                              Long checksum,
-                              Feature checksumDisabledFeature) {
-        super(stream, statsLogger, checksum, checksumDisabledFeature);
-    }
-
-    @Override
-    protected void fail(ResponseHeader header) {
-        setResponse(ResponseUtils.write(header));
-    }
-
-    @Override
-    public Long computeChecksum() {
-        return ProtocolUtils.streamOpCRC32(stream);
-    }
-
-    @Override
-    public Future<ResponseHeader> responseHeader() {
-        return result().map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
-            @Override
-            public ResponseHeader apply(WriteResponse response) {
-                return response.getHeader();
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
deleted file mode 100644
index 6c98468..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.LockingException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.ConstFuture;
-import com.twitter.util.Future;
-import com.twitter.util.Future$;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Try;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Bulk Write Operation.
- */
-public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
-    private final List<ByteBuffer> buffers;
-    private final long payloadSize;
-
-    // Stats
-    private final Counter deniedBulkWriteCounter;
-    private final Counter successRecordCounter;
-    private final Counter failureRecordCounter;
-    private final Counter redirectRecordCounter;
-    private final OpStatsLogger latencyStat;
-    private final Counter bytes;
-    private final Counter bulkWriteBytes;
-
-    private final AccessControlManager accessControlManager;
-
-    // We need to pass these through to preserve ownership change behavior in
-    // client/server. Only include failures which are guaranteed to have failed
-    // all subsequent writes.
-    private boolean isDefiniteFailure(Try<DLSN> result) {
-        boolean def = false;
-        try {
-            result.get();
-        } catch (Exception ex) {
-            if (ex instanceof OwnershipAcquireFailedException
-                || ex instanceof AlreadyClosedException
-                || ex instanceof LockingException) {
-                def = true;
-            }
-        }
-        return def;
-    }
-
-    public BulkWriteOp(String stream,
-                       List<ByteBuffer> buffers,
-                       StatsLogger statsLogger,
-                       StatsLogger perStreamStatsLogger,
-                       StreamPartitionConverter streamPartitionConverter,
-                       Long checksum,
-                       Feature checksumDisabledFeature,
-                       AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "bulkWrite"), checksum, checksumDisabledFeature);
-        this.buffers = buffers;
-        long total = 0;
-        // We do this here because the bytebuffers are mutable.
-        for (ByteBuffer bb : buffers) {
-          total += bb.remaining();
-        }
-        this.payloadSize = total;
-
-        final Partition partition = streamPartitionConverter.convert(stream);
-        // Write record stats
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-        this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
-        this.successRecordCounter = streamOpStats.recordsCounter("success");
-        this.failureRecordCounter = streamOpStats.recordsCounter("failure");
-        this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
-        this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
-        this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
-
-        this.accessControlManager = accessControlManager;
-
-        final long size = getPayloadSize();
-        result().addEventListener(new FutureEventListener<BulkWriteResponse>() {
-            @Override
-            public void onSuccess(BulkWriteResponse response) {
-                if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                    latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-                    bytes.add(size);
-                    bulkWriteBytes.add(size);
-                } else {
-                    latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-                }
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-            }
-        });
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowWrite(stream)) {
-            deniedBulkWriteCounter.inc();
-            throw new RequestDeniedException(stream, "bulkWrite");
-        }
-        super.preExecute();
-    }
-
-    @Override
-    public long getPayloadSize() {
-      return payloadSize;
-    }
-
-    @Override
-    protected Future<BulkWriteResponse> executeOp(AsyncLogWriter writer,
-                                                  Sequencer sequencer,
-                                                  Object txnLock) {
-        // Need to convert input buffers to LogRecords.
-        List<LogRecord> records;
-        Future<List<Future<DLSN>>> futureList;
-        synchronized (txnLock) {
-            records = asRecordList(buffers, sequencer);
-            futureList = writer.writeBulk(records);
-        }
-
-        // Collect into a list of tries to make it easier to extract exception or DLSN.
-        Future<List<Try<DLSN>>> writes = asTryList(futureList);
-
-        Future<BulkWriteResponse> response = writes.flatMap(
-            new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() {
-                @Override
-                public Future<BulkWriteResponse> apply(List<Try<DLSN>> results) {
-
-                    // Considered a success at batch level even if no individual writes succeeed.
-                    // The reason is that its impossible to make an appropriate decision re retries without
-                    // individual buffer failure reasons.
-                    List<WriteResponse> writeResponses = new ArrayList<WriteResponse>(results.size());
-                    BulkWriteResponse bulkWriteResponse =
-                        ResponseUtils.bulkWriteSuccess().setWriteResponses(writeResponses);
-
-                    // Promote the first result to an op-level failure if we're sure all other writes have
-                    // failed.
-                    if (results.size() > 0) {
-                        Try<DLSN> firstResult = results.get(0);
-                        if (isDefiniteFailure(firstResult)) {
-                            return new ConstFuture(firstResult);
-                        }
-                    }
-
-                    // Translate all futures to write responses.
-                    Iterator<Try<DLSN>> iterator = results.iterator();
-                    while (iterator.hasNext()) {
-                        Try<DLSN> completedFuture = iterator.next();
-                        try {
-                            DLSN dlsn = completedFuture.get();
-                            WriteResponse writeResponse = ResponseUtils.writeSuccess().setDlsn(dlsn.serialize());
-                            writeResponses.add(writeResponse);
-                            successRecordCounter.inc();
-                        } catch (Exception ioe) {
-                            WriteResponse writeResponse = ResponseUtils.write(ResponseUtils.exceptionToHeader(ioe));
-                            writeResponses.add(writeResponse);
-                            if (StatusCode.FOUND == writeResponse.getHeader().getCode()) {
-                                redirectRecordCounter.inc();
-                            } else {
-                                failureRecordCounter.inc();
-                            }
-                        }
-                    }
-
-                    return Future.value(bulkWriteResponse);
-                }
-            }
-        );
-
-        return response;
-    }
-
-    private List<LogRecord> asRecordList(List<ByteBuffer> buffers, Sequencer sequencer) {
-        List<LogRecord> records = new ArrayList<LogRecord>(buffers.size());
-        for (ByteBuffer buffer : buffers) {
-            byte[] payload = new byte[buffer.remaining()];
-            buffer.get(payload);
-            records.add(new LogRecord(sequencer.nextId(), payload));
-        }
-        return records;
-    }
-
-    private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> futureList) {
-        return futureList.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() {
-            @Override
-            public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> results) {
-                return Future$.MODULE$.collectToTry(results);
-            }
-        });
-    }
-
-    @Override
-    protected void fail(ResponseHeader header) {
-        if (StatusCode.FOUND == header.getCode()) {
-            redirectRecordCounter.add(buffers.size());
-        } else {
-            failureRecordCounter.add(buffers.size());
-        }
-        setResponse(ResponseUtils.bulkWrite(header));
-    }
-
-    @Override
-    public Future<ResponseHeader> responseHeader() {
-        return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
-            @Override
-            public ResponseHeader apply(BulkWriteResponse response) {
-                return response.getHeader();
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
deleted file mode 100644
index 3ecb46f..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to delete a log stream.
- */
-public class DeleteOp extends AbstractWriteOp {
-    private final StreamManager streamManager;
-    private final Counter deniedDeleteCounter;
-    private final AccessControlManager accessControlManager;
-
-    public DeleteOp(String stream,
-                    StatsLogger statsLogger,
-                    StatsLogger perStreamStatsLogger,
-                    StreamManager streamManager,
-                    Long checksum,
-                    Feature checksumEnabledFeature,
-                    AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "delete"), checksum, checksumEnabledFeature);
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-        this.deniedDeleteCounter = streamOpStats.requestDeniedCounter("delete");
-        this.accessControlManager = accessControlManager;
-        this.streamManager = streamManager;
-    }
-
-    @Override
-    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                              Sequencer sequencer,
-                                              Object txnLock) {
-        Future<Void> result = streamManager.deleteAndRemoveAsync(streamName());
-        return result.map(new AbstractFunction1<Void, WriteResponse>() {
-            @Override
-            public WriteResponse apply(Void value) {
-                return ResponseUtils.writeSuccess();
-            }
-        });
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowTruncate(stream)) {
-            deniedDeleteCounter.inc();
-            throw new RequestDeniedException(stream, "delete");
-        }
-        super.preExecute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
deleted file mode 100644
index 0ffa619..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Heartbeat Operation.
- */
-public class HeartbeatOp extends AbstractWriteOp {
-
-    static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);
-
-    private final AccessControlManager accessControlManager;
-    private final Counter deniedHeartbeatCounter;
-    private final byte dlsnVersion;
-
-    private boolean writeControlRecord = false;
-
-    public HeartbeatOp(String stream,
-                       StatsLogger statsLogger,
-                       StatsLogger perStreamStatsLogger,
-                       byte dlsnVersion,
-                       Long checksum,
-                       Feature checksumDisabledFeature,
-                       AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "heartbeat"), checksum, checksumDisabledFeature);
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-        this.deniedHeartbeatCounter = streamOpStats.requestDeniedCounter("heartbeat");
-        this.dlsnVersion = dlsnVersion;
-        this.accessControlManager = accessControlManager;
-    }
-
-    public HeartbeatOp setWriteControlRecord(boolean writeControlRecord) {
-        this.writeControlRecord = writeControlRecord;
-        return this;
-    }
-
-    @Override
-    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                              Sequencer sequencer,
-                                              Object txnLock) {
-        // write a control record if heartbeat is the first request of the recovered log segment.
-        if (writeControlRecord) {
-            long txnId;
-            Future<DLSN> writeResult;
-            synchronized (txnLock) {
-                txnId = sequencer.nextId();
-                LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA);
-                hbRecord.setControl();
-                writeResult = writer.write(hbRecord);
-            }
-            return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
-                @Override
-                public WriteResponse apply(DLSN value) {
-                    return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
-                }
-            });
-        } else {
-            return Future.value(ResponseUtils.writeSuccess());
-        }
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowAcquire(stream)) {
-            deniedHeartbeatCounter.inc();
-            throw new RequestDeniedException(stream, "heartbeat");
-        }
-        super.preExecute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
deleted file mode 100644
index 6ec8642..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to release ownership of a log stream.
- */
-public class ReleaseOp extends AbstractWriteOp {
-    private final StreamManager streamManager;
-    private final Counter deniedReleaseCounter;
-    private final AccessControlManager accessControlManager;
-
-    public ReleaseOp(String stream,
-                     StatsLogger statsLogger,
-                     StatsLogger perStreamStatsLogger,
-                     StreamManager streamManager,
-                     Long checksum,
-                     Feature checksumDisabledFeature,
-                     AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "release"), checksum, checksumDisabledFeature);
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
-        this.deniedReleaseCounter = streamOpStats.requestDeniedCounter("release");
-        this.accessControlManager = accessControlManager;
-        this.streamManager = streamManager;
-    }
-
-    @Override
-    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                              Sequencer sequencer,
-                                              Object txnLock) {
-        Future<Void> result = streamManager.closeAndRemoveAsync(streamName());
-        return result.map(new AbstractFunction1<Void, WriteResponse>() {
-            @Override
-            public WriteResponse apply(Void value) {
-                return ResponseUtils.writeSuccess();
-            }
-        });
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowRelease(stream)) {
-            deniedReleaseCounter.inc();
-            throw new RequestDeniedException(stream, "release");
-        }
-        super.preExecute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
deleted file mode 100644
index 3517a63..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.streamset.Partition;
-import com.twitter.util.Future;
-import java.io.IOException;
-
-/**
- * Stream is the per stream request handler in the DL service layer.
- *
- * <p>The collection of Streams in the proxy are managed by StreamManager.
- */
-public interface Stream {
-
-    /**
-     * Get the stream configuration for this stream.
-     *
-     * @return stream configuration
-     */
-    DynamicDistributedLogConfiguration getStreamConfiguration();
-
-    /**
-     * Get the stream's last recorded current owner (may be out of date). Used
-     * as a hint for the client.
-     * @return last known owner for the stream
-     */
-    String getOwner();
-
-    /**
-     * Get the stream name.
-     * @return stream name
-     */
-    String getStreamName();
-
-    /**
-     * Get the represented partition name.
-     *
-     * @return represented partition name.
-     */
-    Partition getPartition();
-
-    /**
-     * Expensive initialization code run after stream has been allocated in
-     * StreamManager.
-     *
-     * @throws IOException when encountered exception on initialization
-     */
-    void initialize() throws IOException;
-
-    /**
-     * Another initialize method (actually Thread.start). Should probably be
-     * moved to initialize().
-     */
-    void start();
-
-    /**
-     * Asynchronous close method.
-     * @param reason for closing
-     * @return future satisfied once close complete
-     */
-    Future<Void> requestClose(String reason);
-
-    /**
-     * Delete the stream from DL backend.
-     *
-     * @throws IOException when encountered exception on deleting the stream.
-     */
-    void delete() throws IOException;
-
-    /**
-     * Execute the stream operation against this stream.
-     *
-     * @param op operation to execute
-     */
-    void submit(StreamOp op);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
deleted file mode 100644
index 845ef21..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-/**
- * Factory to create a stream with provided stream configuration {@code streamConf}.
- */
-public interface StreamFactory {
-
-    /**
-     * Create a stream object.
-     *
-     * @param name stream name
-     * @param streamConf stream configuration
-     * @param streamManager manager of streams
-     * @return stream object
-     */
-    Stream create(String name,
-                  DynamicDistributedLogConfiguration streamConf,
-                  StreamManager streamManager);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
deleted file mode 100644
index 2b90d55..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.FatalErrorHandler;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Timer;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.jboss.netty.util.HashedWheelTimer;
-
-/**
- * The implementation of {@link StreamFactory}.
- */
-public class StreamFactoryImpl implements StreamFactory {
-    private final String clientId;
-    private final StreamOpStats streamOpStats;
-    private final ServerConfiguration serverConfig;
-    private final DistributedLogConfiguration dlConfig;
-    private final FeatureProvider featureProvider;
-    private final StreamConfigProvider streamConfigProvider;
-    private final StreamPartitionConverter streamPartitionConverter;
-    private final DistributedLogNamespace dlNamespace;
-    private final OrderedScheduler scheduler;
-    private final FatalErrorHandler fatalErrorHandler;
-    private final HashedWheelTimer requestTimer;
-    private final Timer futureTimer;
-
-    public StreamFactoryImpl(String clientId,
-        StreamOpStats streamOpStats,
-        ServerConfiguration serverConfig,
-        DistributedLogConfiguration dlConfig,
-        FeatureProvider featureProvider,
-        StreamConfigProvider streamConfigProvider,
-        StreamPartitionConverter streamPartitionConverter,
-        DistributedLogNamespace dlNamespace,
-        OrderedScheduler scheduler,
-        FatalErrorHandler fatalErrorHandler,
-        HashedWheelTimer requestTimer) {
-
-        this.clientId = clientId;
-        this.streamOpStats = streamOpStats;
-        this.serverConfig = serverConfig;
-        this.dlConfig = dlConfig;
-        this.featureProvider = featureProvider;
-        this.streamConfigProvider = streamConfigProvider;
-        this.streamPartitionConverter = streamPartitionConverter;
-        this.dlNamespace = dlNamespace;
-        this.scheduler = scheduler;
-        this.fatalErrorHandler = fatalErrorHandler;
-        this.requestTimer = requestTimer;
-        this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer);
-    }
-
-    @Override
-    public Stream create(String name,
-                         DynamicDistributedLogConfiguration streamConf,
-                         StreamManager streamManager) {
-        return new StreamImpl(name,
-            streamPartitionConverter.convert(name),
-            clientId,
-            streamManager,
-            streamOpStats,
-            serverConfig,
-            dlConfig,
-            streamConf,
-            featureProvider,
-            streamConfigProvider,
-            dlNamespace,
-            scheduler,
-            fatalErrorHandler,
-            requestTimer,
-            futureTimer);
-    }
-}