You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:20 UTC

[15/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
deleted file mode 100644
index bb14066..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.tools;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A Tool Framework
- */
-public abstract class Tool {
-
-    /**
-     * Interface of a command to run in a tool.
-     */
-    protected interface Command {
-        String getName();
-        String getDescription();
-        int runCmd(String[] args) throws Exception;
-        void printUsage();
-    }
-
-    /**
-     * {@link org.apache.commons.cli.Options} based command.
-     */
-    protected abstract static class OptsCommand implements Command {
-
-        /**
-         * @return options used by this command.
-         */
-        protected abstract Options getOptions();
-
-        /**
-         * @return usage of this command.
-         */
-        protected String getUsage() {
-            return cmdName + " [options]";
-        }
-
-        /**
-         * Run given command line <i>commandLine</i>.
-         *
-         * @param commandLine
-         *          command line to run.
-         * @return return code of this command.
-         * @throws Exception
-         */
-        protected abstract int runCmd(CommandLine commandLine) throws Exception;
-
-        protected String cmdName;
-        protected String description;
-
-        protected OptsCommand(String name, String description) {
-            this.cmdName = name;
-            this.description = description;
-        }
-
-        @Override
-        public String getName() {
-            return cmdName;
-        }
-
-        @Override
-        public String getDescription() {
-            return description;
-        }
-
-        @Override
-        public int runCmd(String[] args) throws Exception {
-            try {
-                BasicParser parser = new BasicParser();
-                CommandLine cmdline = parser.parse(getOptions(), args);
-                return runCmd(cmdline);
-            } catch (ParseException e) {
-                printUsage();
-                return -1;
-            }
-        }
-
-        @Override
-        public void printUsage() {
-            HelpFormatter helpFormatter = new HelpFormatter();
-            println(cmdName + ": " + getDescription());
-            helpFormatter.printHelp(getUsage(), getOptions());
-        }
-    }
-
-    public class HelpCommand implements Command {
-
-        @Override
-        public String getName() {
-            return "help";
-        }
-
-        @Override
-        public String getDescription() {
-            return "describe the usage of this tool or its sub-commands.";
-        }
-
-        @Override
-        public int runCmd(String[] args) throws Exception {
-            if (args.length == 0) {
-                printToolUsage();
-                return -1;
-            }
-            String cmdName = args[0];
-            Command command = commands.get(cmdName);
-            if (null == command) {
-                System.err.println("Unknown command " + cmdName);
-                printToolUsage();
-                return -1;
-            }
-            command.printUsage();
-            println("");
-            return 0;
-        }
-
-        @Override
-        public void printUsage() {
-            println(getName() + ": " + getDescription());
-            println("");
-            println("usage: " + getName() + " <command>");
-        }
-    }
-
-    // Commands managed by a tool
-    protected final Map<String, Command> commands =
-            new TreeMap<String, Command>();
-
-    protected Tool() {
-        addCommand(new HelpCommand());
-    }
-
-    /**
-     * @return tool name.
-     */
-    protected abstract String getName();
-
-    /**
-     * Add a command in this tool.
-     *
-     * @param command
-     *          command to run in this tool.
-     */
-    protected void addCommand(Command command) {
-        commands.put(command.getName(), command);
-    }
-
-    /**
-     * Print a message in this tool.
-     *
-     * @param msg
-     *          message to print
-     */
-    protected static void println(String msg) {
-        System.out.println(msg);
-    }
-
-    /**
-     * print tool usage.
-     */
-    protected void printToolUsage() {
-        println("Usage: " + getName() + " <command>");
-        println("");
-        int maxKeyLength = 0;
-        for (String key : commands.keySet()) {
-            if (key.length() > maxKeyLength) {
-                maxKeyLength = key.length();
-            }
-        }
-        maxKeyLength += 2;
-        for (Map.Entry<String, Command> entry : commands.entrySet()) {
-            StringBuilder spacesBuilder = new StringBuilder();
-            int numSpaces = maxKeyLength - entry.getKey().length();
-            for (int i = 0; i < numSpaces; i++) {
-                spacesBuilder.append(" ");
-            }
-            println("\t"  + entry.getKey() + spacesBuilder.toString() + ": " + entry.getValue().getDescription());
-        }
-        println("");
-    }
-
-    public int run(String[] args) throws Exception {
-        if (args.length <= 0) {
-            printToolUsage();
-            return -1;
-        }
-        String cmdName = args[0];
-        Command cmd = commands.get(cmdName);
-        if (null == cmd) {
-            System.err.println("ERROR: Unknown command " + cmdName);
-            printToolUsage();
-            return -1;
-        }
-        // prepare new args
-        String[] newArgs = new String[args.length - 1];
-        System.arraycopy(args, 1, newArgs, 0, newArgs.length);
-        return cmd.runCmd(newArgs);
-    }
-
-    public static void main(String args[]) {
-        int rc = -1;
-        if (args.length <= 0) {
-            System.err.println("No tool to run.");
-            System.err.println("");
-            System.err.println("Usage : Tool <tool_class_name> <options>");
-            System.exit(-1);
-        }
-        String toolClass = args[0];
-        try {
-            Tool tool = ReflectionUtils.newInstance(toolClass, Tool.class);
-            String[] newArgs = new String[args.length - 1];
-            System.arraycopy(args, 1, newArgs, 0, newArgs.length);
-            rc = tool.run(newArgs);
-        } catch (Throwable t) {
-            System.err.println("Fail to run tool " + toolClass + " : ");
-            t.printStackTrace();
-        }
-        System.exit(rc);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
deleted file mode 100644
index e2125bc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Tools for distributedlog
- */
-package com.twitter.distributedlog.tools;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
deleted file mode 100644
index dcc3f58..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.io.AsyncDeleteable;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * A common interface to allocate <i>I</i> under transaction <i>T</i>.
- *
- * <h3>Usage Example</h3>
- *
- * Here is an example on demonstrating how `Allocator` works.
- *
- * <pre> {@code
- * Allocator<I, T, R> allocator = ...;
- *
- * // issue an allocate request
- * try {
- *   allocator.allocate();
- * } catch (IOException ioe) {
- *   // handle the exception
- *   ...
- *   return;
- * }
- *
- * // Start a transaction
- * final Transaction<T> txn = ...;
- *
- * // Try obtain object I
- * Future<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() {
- *     public void onCommit(I resource) {
- *         // the obtain succeed, process with the resource
- *     }
- *     public void onAbort() {
- *         // the obtain failed.
- *     }
- * }).addFutureEventListener(new FutureEventListener() {
- *     public void onSuccess(I resource) {
- *         // the try obtain succeed. but the obtain has not been confirmed or aborted.
- *         // execute the transaction to confirm if it could complete obtain
- *         txn.execute();
- *     }
- *     public void onFailure(Throwable t) {
- *         // handle the failure of try obtain
- *     }
- * });
- *
- * }</pre>
- */
-public interface Allocator<I, T> extends AsyncCloseable, AsyncDeleteable {
-
-    /**
-     * Issue allocation request to allocate <i>I</i>.
-     * The implementation should be non-blocking call.
-     *
-     * @throws IOException
-     *          if fail to request allocating a <i>I</i>.
-     */
-    void allocate() throws IOException;
-
-    /**
-     * Try obtaining an <i>I</i> in a given transaction <i>T</i>. The object obtained is tentative.
-     * Whether the object is obtained or aborted is determined by the result of the execution. You could
-     * register a listener under this `tryObtain` operation to know whether the object is obtained or
-     * aborted.
-     *
-     * <p>
-     * It is a typical two-phases operation on obtaining a resource from allocator.
-     * The future returned by this method acts as a `prepare` operation, the resource is tentative obtained
-     * from the allocator. The execution of the txn acts as a `commit` operation, the resource is confirmed
-     * to be obtained by this transaction. <code>listener</code> is for the whole completion of the obtain.
-     * <p>
-     * <code>listener</code> is only triggered after `prepare` succeed. if `prepare` failed, no actions will
-     * happen to the listener.
-     *
-     * @param txn
-     *          transaction.
-     * @return future result returning <i>I</i> that would be obtained under transaction <code>txn</code>.
-     */
-    Future<I> tryObtain(Transaction<T> txn, OpListener<I> listener);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
deleted file mode 100644
index 95ef3e2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Optional;
-import org.apache.commons.cli.CommandLine;
-
-/**
- * Utils to commandline
- */
-public class CommandLineUtils {
-
-    public static Optional<String> getOptionalStringArg(CommandLine cmdline, String arg) {
-        if (cmdline.hasOption(arg)) {
-            return Optional.of(cmdline.getOptionValue(arg));
-        } else {
-            return Optional.absent();
-        }
-    }
-
-    public static Optional<Boolean> getOptionalBooleanArg(CommandLine cmdline, String arg) {
-        if (cmdline.hasOption(arg)) {
-            return Optional.of(true);
-        } else {
-            return Optional.absent();
-        }
-    }
-
-    public static Optional<Integer> getOptionalIntegerArg(CommandLine cmdline, String arg) throws IllegalArgumentException {
-        try {
-            if (cmdline.hasOption(arg)) {
-                return Optional.of(Integer.parseInt(cmdline.getOptionValue(arg)));
-            } else {
-                return Optional.absent();
-            }
-        } catch (NumberFormatException ex) {
-            throw new IllegalArgumentException(arg + " is not a number");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
deleted file mode 100644
index 46dd3b6..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.config.ConcurrentConstConfiguration;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Iterator;
-
-public class ConfUtils {
-
-    /**
-     * Load configurations with prefixed <i>section</i> from source configuration <i>srcConf</i> into
-     * target configuration <i>targetConf</i>.
-     *
-     * @param targetConf
-     *          Target Configuration
-     * @param srcConf
-     *          Source Configuration
-     * @param section
-     *          Section Key
-     */
-    public static void loadConfiguration(Configuration targetConf, Configuration srcConf, String section) {
-        Iterator confKeys = srcConf.getKeys();
-        while (confKeys.hasNext()) {
-            Object keyObject = confKeys.next();
-            if (!(keyObject instanceof String)) {
-                continue;
-            }
-            String key = (String) keyObject;
-            if (key.startsWith(section)) {
-                targetConf.setProperty(key.substring(section.length()), srcConf.getProperty(key));
-            }
-        }
-    }
-
-    /**
-     * Create const dynamic configuration based on distributedlog configuration.
-     *
-     * @param conf
-     *          static distributedlog configuration.
-     * @return dynamic configuration
-     */
-    public static DynamicDistributedLogConfiguration getConstDynConf(DistributedLogConfiguration conf) {
-        ConcurrentConstConfiguration constConf = new ConcurrentConstConfiguration(conf);
-        return new DynamicDistributedLogConfiguration(constConf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
deleted file mode 100644
index 2f9e091..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang.StringUtils;
-
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization.
- */
-public class DLUtils {
-
-    /**
-     * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>.
-     *
-     * @param segments
-     *          segments to search
-     * @param transactionId
-     *          transaction id to find
-     * @return the first log segment whose transaction ids are not less than <code>transactionId</code>.
-     */
-    public static int findLogSegmentNotLessThanTxnId(List<LogSegmentMetadata> segments,
-                                                     long transactionId) {
-        int found = -1;
-        for (int i = segments.size() - 1; i >= 0; i--) {
-            LogSegmentMetadata segment = segments.get(i);
-            if (segment.getFirstTxId() <= transactionId) {
-                found = i;
-                break;
-            }
-        }
-        if (found <= -1) {
-            return -1;
-        }
-        if (found == 0 && segments.get(0).getFirstTxId() == transactionId) {
-            return 0;
-        }
-        LogSegmentMetadata foundSegment = segments.get(found);
-        if (foundSegment.getFirstTxId() == transactionId) {
-            for (int i = found - 1; i >= 0; i--) {
-                LogSegmentMetadata segment = segments.get(i);
-                if (segment.isInProgress()) {
-                    break;
-                }
-                if (segment.getLastTxId() < transactionId) {
-                    break;
-                }
-                found = i;
-            }
-            return found;
-        } else {
-            if (foundSegment.isInProgress()
-                    || found == segments.size() - 1) {
-                return found;
-            }
-            if (foundSegment.getLastTxId() >= transactionId) {
-                return found;
-            }
-            return found + 1;
-        }
-    }
-
-    /**
-     * Assign next log segment sequence number based on a decreasing list of log segments.
-     *
-     * @param segmentListDesc
-     *          a decreasing list of log segments
-     * @return null if no log segments was assigned a sequence number in <code>segmentListDesc</code>.
-     *         otherwise, return next log segment sequence number
-     */
-    public static Long nextLogSegmentSequenceNumber(List<LogSegmentMetadata> segmentListDesc) {
-        int lastAssignedLogSegmentIdx = -1;
-        Long lastAssignedLogSegmentSeqNo = null;
-        Long nextLogSegmentSeqNo = null;
-
-        for (int i = 0; i < segmentListDesc.size(); i++) {
-            LogSegmentMetadata metadata = segmentListDesc.get(i);
-            if (LogSegmentMetadata.supportsLogSegmentSequenceNo(metadata.getVersion())) {
-                lastAssignedLogSegmentSeqNo = metadata.getLogSegmentSequenceNumber();
-                lastAssignedLogSegmentIdx = i;
-                break;
-            }
-        }
-
-        if (null != lastAssignedLogSegmentSeqNo) {
-            // latest log segment is assigned with a sequence number, start with next sequence number
-            nextLogSegmentSeqNo = lastAssignedLogSegmentSeqNo + lastAssignedLogSegmentIdx + 1;
-        }
-        return nextLogSegmentSeqNo;
-    }
-
-    /**
-     * Compute the start sequence id for <code>segment</code>, based on previous segment list
-     * <code>segmentListDesc</code>.
-     *
-     * @param logSegmentDescList
-     *          list of segments in descending order
-     * @param segment
-     *          segment to compute start sequence id for
-     * @return start sequence id
-     */
-    public static long computeStartSequenceId(List<LogSegmentMetadata> logSegmentDescList,
-                                              LogSegmentMetadata segment)
-            throws UnexpectedException {
-        long startSequenceId = 0L;
-        for (LogSegmentMetadata metadata : logSegmentDescList) {
-            if (metadata.getLogSegmentSequenceNumber() >= segment.getLogSegmentSequenceNumber()) {
-                continue;
-            } else if (metadata.getLogSegmentSequenceNumber() < (segment.getLogSegmentSequenceNumber() - 1)) {
-                break;
-            }
-            if (metadata.isInProgress()) {
-                throw new UnexpectedException("Should not complete log segment " + segment.getLogSegmentSequenceNumber()
-                        + " since it's previous log segment is still inprogress : " + logSegmentDescList);
-            }
-            if (metadata.supportsSequenceId()) {
-                startSequenceId = metadata.getStartSequenceId() + metadata.getRecordCount();
-            }
-        }
-        return startSequenceId;
-    }
-
-    /**
-     * Deserialize log segment sequence number for bytes <code>data</code>.
-     *
-     * @param data
-     *          byte representation of log segment sequence number
-     * @return log segment sequence number
-     * @throws NumberFormatException if the bytes aren't valid
-     */
-    public static long deserializeLogSegmentSequenceNumber(byte[] data) {
-        String seqNoStr = new String(data, UTF_8);
-        return Long.parseLong(seqNoStr);
-    }
-
-    /**
-     * Serilize log segment sequence number <code>logSegmentSeqNo</code> into bytes.
-     *
-     * @param logSegmentSeqNo
-     *          log segment sequence number
-     * @return byte representation of log segment sequence number
-     */
-    public static byte[] serializeLogSegmentSequenceNumber(long logSegmentSeqNo) {
-        return Long.toString(logSegmentSeqNo).getBytes(UTF_8);
-    }
-
-    /**
-     * Deserialize log record transaction id for bytes <code>data</code>.
-     *
-     * @param data
-     *          byte representation of log record transaction id
-     * @return log record transaction id
-     * @throws NumberFormatException if the bytes aren't valid
-     */
-    public static long deserializeTransactionId(byte[] data) {
-        String seqNoStr = new String(data, UTF_8);
-        return Long.parseLong(seqNoStr);
-    }
-
-    /**
-     * Serilize log record transaction id <code>transactionId</code> into bytes.
-     *
-     * @param transactionId
-     *          log record transaction id
-     * @return byte representation of log record transaction id.
-     */
-    public static byte[] serializeTransactionId(long transactionId) {
-        return Long.toString(transactionId).getBytes(UTF_8);
-    }
-
-    /**
-     * Serialize log segment id into bytes.
-     *
-     * @param logSegmentId
-     *          log segment id
-     * @return bytes representation of log segment id
-     */
-    public static byte[] logSegmentId2Bytes(long logSegmentId) {
-        return Long.toString(logSegmentId).getBytes(UTF_8);
-    }
-
-    /**
-     * Deserialize bytes into log segment id.
-     *
-     * @param data
-     *          bytes representation of log segment id
-     * @return log segment id
-     */
-    public static long bytes2LogSegmentId(byte[] data) {
-        return Long.parseLong(new String(data, UTF_8));
-    }
-
-    /**
-     * Normalize the uri.
-     *
-     * @param uri the distributedlog uri.
-     * @return the normalized uri
-     */
-    public static URI normalizeURI(URI uri) {
-        checkNotNull(uri, "DistributedLog uri is null");
-        String scheme = uri.getScheme();
-        checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
-        scheme = scheme.toLowerCase();
-        String[] schemeParts = StringUtils.split(scheme, '-');
-        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
-                "Unknown distributedlog scheme found : " + uri);
-        URI normalizedUri;
-        try {
-            normalizedUri = new URI(
-                    schemeParts[0],     // remove backend info
-                    uri.getAuthority(),
-                    uri.getPath(),
-                    uri.getQuery(),
-                    uri.getFragment());
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e);
-        }
-        return normalizedUri;
-    }
-
-    private static String getHostIpLockClientId() {
-        try {
-            return InetAddress.getLocalHost().toString();
-        } catch(Exception ex) {
-            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
-        }
-    }
-
-    /**
-     * Normalize the client id.
-     *
-     * @return the normalized client id.
-     */
-    public static String normalizeClientId(String clientId) {
-        String normalizedClientId;
-        if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
-            normalizedClientId = getHostIpLockClientId();
-        } else {
-            normalizedClientId = clientId;
-        }
-        return normalizedClientId;
-    }
-
-    /**
-     * Is it a reserved stream name in bkdl namespace?
-     *
-     * @param name
-     *          stream name
-     * @return true if it is reserved name, otherwise false.
-     */
-    public static boolean isReservedStreamName(String name) {
-        return name.startsWith(".");
-    }
-
-    /**
-     * Validate the stream name.
-     *
-     * @param nameOfStream
-     *          name of stream
-     * @throws InvalidStreamNameException
-     */
-    public static void validateName(String nameOfStream)
-            throws InvalidStreamNameException {
-        String reason = null;
-        char chars[] = nameOfStream.toCharArray();
-        char c;
-        // validate the stream to see if meet zookeeper path's requirement
-        for (int i = 0; i < chars.length; i++) {
-            c = chars[i];
-
-            if (c == 0) {
-                reason = "null character not allowed @" + i;
-                break;
-            } else if (c == '/') {
-                reason = "'/' not allowed @" + i;
-                break;
-            } else if (c > '\u0000' && c < '\u001f'
-                    || c > '\u007f' && c < '\u009F'
-                    || c > '\ud800' && c < '\uf8ff'
-                    || c > '\ufff0' && c < '\uffff') {
-                reason = "invalid charater @" + i;
-                break;
-            }
-        }
-        if (null != reason) {
-            throw new InvalidStreamNameException(nameOfStream, reason);
-        }
-        if (isReservedStreamName(nameOfStream)) {
-            throw new InvalidStreamNameException(nameOfStream,
-                    "Stream Name is reserved");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
deleted file mode 100644
index 64101b3..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailpointUtils {
-    static final Logger logger = LoggerFactory.getLogger(FailpointUtils.class);
-
-    public enum FailPointName {
-        FP_StartLogSegmentBeforeLedgerCreate,
-        FP_StartLogSegmentAfterLedgerCreate,
-        FP_StartLogSegmentAfterInProgressCreate,
-        FP_StartLogSegmentOnAssignLogSegmentSequenceNumber,
-        FP_FinalizeLedgerBeforeDelete,
-        FP_TransmitBeforeAddEntry,
-        FP_TransmitComplete,
-        FP_WriteInternalLostLock,
-        FP_TransmitFailGetBuffer,
-        FP_LockUnlockCleanup,
-        FP_LockTryCloseRaceCondition,
-        FP_LockTryAcquire,
-        FP_ZooKeeperConnectionLoss,
-        FP_RecoverIncompleteLogSegments,
-        FP_LogWriterIssuePending,
-    }
-
-    public static interface FailPointAction {
-        boolean checkFailPoint() throws IOException;
-        boolean checkFailPointNoThrow();
-    }
-
-    public static abstract class AbstractFailPointAction implements FailPointAction {
-        @Override
-        public boolean checkFailPointNoThrow() {
-            try {
-                return checkFailPoint();
-            } catch (IOException ex) {
-                logger.error("failpoint action raised unexpected exception");
-                return true;
-            }
-        }
-    }
-
-    public static final FailPointAction DEFAULT_ACTION = new AbstractFailPointAction() {
-        @Override
-        public boolean checkFailPoint() throws IOException {
-            return true;
-        }
-    };
-
-    public static final FailPointAction THROW_ACTION = new AbstractFailPointAction() {
-        @Override
-        public boolean checkFailPoint() throws IOException {
-            throw new IOException("Throw ioexception for failure point");
-        }
-    };
-
-    public enum FailPointActions {
-        FailPointAction_Default,
-        FailPointAction_Throw
-    }
-
-    static ConcurrentHashMap<FailPointName, FailPointAction> failPointState =
-            new ConcurrentHashMap<FailPointName, FailPointAction>();
-
-    public static void setFailpoint(FailPointName failpoint, FailPointActions action) {
-        FailPointAction fpAction = null;
-        switch (action) {
-        case FailPointAction_Default:
-            fpAction = DEFAULT_ACTION;
-            break;
-        case FailPointAction_Throw:
-            fpAction = THROW_ACTION;
-            break;
-        default:
-            break;
-        }
-        setFailpoint(failpoint, fpAction);
-    }
-
-    public static void setFailpoint(FailPointName failpoint, FailPointAction action) {
-        if (null != action) {
-            failPointState.put(failpoint, action);
-        }
-    }
-
-    public static void removeFailpoint(FailPointName failpoint) {
-        failPointState.remove(failpoint);
-    }
-
-    public static boolean checkFailPoint(FailPointName failPoint) throws IOException {
-        FailPointAction action = failPointState.get(failPoint);
-
-        if (action == null) {
-            return false;
-        }
-
-        try {
-            return action.checkFailPoint();
-        } catch (IOException ioe) {
-            throw new IOException("Induced Exception at:" + failPoint, ioe);
-        }
-    }
-
-    public static boolean checkFailPointNoThrow(FailPointName failPoint) {
-        FailPointAction action = failPointState.get(failPoint);
-
-        if (action == null) {
-            return false;
-        }
-
-        return action.checkFailPointNoThrow();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
deleted file mode 100644
index f206a25..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.stats.OpStatsListener;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureCancelledException;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utilities to process future
- */
-public class FutureUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class);
-
-    public static class OrderedFutureEventListener<R>
-            implements FutureEventListener<R> {
-
-        public static <R> OrderedFutureEventListener<R> of(
-                FutureEventListener<R> listener,
-                OrderedScheduler scheduler,
-                Object key) {
-            return new OrderedFutureEventListener<R>(scheduler, key, listener);
-        }
-
-        private final OrderedScheduler scheduler;
-        private final Object key;
-        private final FutureEventListener<R> listener;
-
-        private OrderedFutureEventListener(OrderedScheduler scheduler,
-                                           Object key,
-                                           FutureEventListener<R> listener) {
-            this.scheduler = scheduler;
-            this.key = key;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onSuccess(final R value) {
-            scheduler.submit(key, new Runnable() {
-                @Override
-                public void run() {
-                    listener.onSuccess(value);
-                }
-            });
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            scheduler.submit(key, new Runnable() {
-                @Override
-                public void run() {
-                    listener.onFailure(cause);
-                }
-            });
-        }
-    }
-
-    public static class FutureEventListenerRunnable<R>
-            implements FutureEventListener<R> {
-
-        public static <R> FutureEventListenerRunnable<R> of(
-                FutureEventListener<R> listener,
-                ExecutorService executorService) {
-            return new FutureEventListenerRunnable<R>(executorService, listener);
-        }
-
-        private final ExecutorService executorService;
-        private final FutureEventListener<R> listener;
-
-        private FutureEventListenerRunnable(ExecutorService executorService,
-                                            FutureEventListener<R> listener) {
-            this.executorService = executorService;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onSuccess(final R value) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    listener.onSuccess(value);
-                }
-            });
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    listener.onFailure(cause);
-                }
-            });
-        }
-    }
-
-    private static class ListFutureProcessor<T, R>
-            extends Function<Throwable, BoxedUnit>
-            implements FutureEventListener<R>, Runnable {
-
-        private volatile boolean interrupted = false;
-        private final Iterator<T> itemsIter;
-        private final Function<T, Future<R>> processFunc;
-        private final Promise<List<R>> promise;
-        private final List<R> results;
-        private final ExecutorService callbackExecutor;
-
-        ListFutureProcessor(List<T> items,
-                            Function<T, Future<R>> processFunc,
-                            ExecutorService callbackExecutor) {
-            this.itemsIter = items.iterator();
-            this.processFunc = processFunc;
-            this.promise = new Promise<List<R>>();
-            this.promise.setInterruptHandler(this);
-            this.results = new ArrayList<R>();
-            this.callbackExecutor = callbackExecutor;
-        }
-
-        @Override
-        public BoxedUnit apply(Throwable cause) {
-            interrupted = true;
-            return BoxedUnit.UNIT;
-        }
-
-        @Override
-        public void onSuccess(R value) {
-            results.add(value);
-            if (null == callbackExecutor) {
-                run();
-            } else {
-                callbackExecutor.submit(this);
-            }
-        }
-
-        @Override
-        public void onFailure(final Throwable cause) {
-            if (null == callbackExecutor) {
-                promise.setException(cause);
-            } else {
-                callbackExecutor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        promise.setException(cause);
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void run() {
-            if (interrupted) {
-                logger.debug("ListFutureProcessor is interrupted.");
-                return;
-            }
-            if (!itemsIter.hasNext()) {
-                promise.setValue(results);
-                return;
-            }
-            processFunc.apply(itemsIter.next()).addEventListener(this);
-        }
-    }
-
-    /**
-     * Process the list of items one by one using the process function <i>processFunc</i>.
-     * The process will be stopped immediately if it fails on processing any one.
-     *
-     * @param collection list of items
-     * @param processFunc process function
-     * @param callbackExecutor executor to process the item
-     * @return future presents the list of processed results
-     */
-    public static <T, R> Future<List<R>> processList(List<T> collection,
-                                                     Function<T, Future<R>> processFunc,
-                                                     @Nullable ExecutorService callbackExecutor) {
-        ListFutureProcessor<T, R> processor =
-                new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
-        if (null != callbackExecutor) {
-            callbackExecutor.submit(processor);
-        } else {
-            processor.run();
-        }
-        return processor.promise;
-    }
-
-    /**
-     * Add a event listener over <i>result</i> for collecting the operation stats.
-     *
-     * @param result result to listen on
-     * @param opStatsLogger stats logger to record operations stats
-     * @param stopwatch stop watch to time operation
-     * @param <T>
-     * @return result after registered the event listener
-     */
-    public static <T> Future<T> stats(Future<T> result,
-                                      OpStatsLogger opStatsLogger,
-                                      Stopwatch stopwatch) {
-        return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch));
-    }
-
-    /**
-     * Await for the result of the future and thrown bk related exceptions.
-     *
-     * @param result future to wait for
-     * @return the result of future
-     * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions
-     *         thrown from the future, the exceptions will be wrapped into
-     *         {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
-     */
-    public static <T> T bkResult(Future<T> result) throws BKException {
-        try {
-            return Await.result(result);
-        } catch (BKException bke) {
-            throw bke;
-        } catch (InterruptedException ie) {
-            throw BKException.create(BKException.Code.InterruptedException);
-        } catch (Exception e) {
-            logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e);
-            throw BKException.create(BKException.Code.UnexpectedConditionException);
-        }
-    }
-
-    /**
-     * Return the bk exception return code for a <i>throwable</i>.
-     *
-     * @param throwable the cause of the exception
-     * @return the bk exception return code. if the exception isn't bk exceptions,
-     *         it would return {@link BKException.Code#UnexpectedConditionException}.
-     */
-    public static int bkResultCode(Throwable throwable) {
-        if (throwable instanceof BKException) {
-            return ((BKException)throwable).getCode();
-        }
-        return BKException.Code.UnexpectedConditionException;
-    }
-
-    /**
-     * Wait for the result until it completes.
-     *
-     * @param result result to wait
-     * @return the result
-     * @throws IOException when encountered exceptions on the result
-     */
-    public static <T> T result(Future<T> result) throws IOException {
-        return result(result, Duration.Top());
-    }
-
-    /**
-     * Wait for the result for a given <i>duration</i>.
-     * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with
-     * corresponding {@link com.twitter.util.TimeoutException}.
-     *
-     * @param result result to wait
-     * @param duration duration to wait
-     * @return the result
-     * @throws IOException when encountered exceptions on the result or waiting for the result.
-     */
-    public static <T> T result(Future<T> result, Duration duration)
-            throws IOException {
-        try {
-            return Await.result(result, duration);
-        } catch (KeeperException ke) {
-            throw new ZKException("Encountered zookeeper exception on waiting result", ke);
-        } catch (BKException bke) {
-            throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode());
-        } catch (IOException ioe) {
-            throw ioe;
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Interrupted on waiting result", ie);
-        } catch (Exception e) {
-            throw new IOException("Encountered exception on waiting result", e);
-        }
-    }
-
-    /**
-     * Wait for the result of a lock operation.
-     *
-     * @param result result to wait
-     * @param lockPath path of the lock
-     * @return the result
-     * @throws LockingException when encountered exceptions on the result of lock operation
-     */
-    public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException {
-        try {
-            return Await.result(result);
-        } catch (LockingException le) {
-            throw le;
-        } catch (Exception e) {
-            throw new LockingException(lockPath, "Encountered exception on locking ", e);
-        }
-    }
-
-    /**
-     * Convert the <i>throwable</i> to zookeeper related exceptions.
-     *
-     * @param throwable cause
-     * @param path zookeeper path
-     * @return zookeeper related exceptions
-     */
-    public static Throwable zkException(Throwable throwable, String path) {
-        if (throwable instanceof KeeperException) {
-            return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
-        } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
-            return new ZKException("Encountered zookeeper connection loss on " + path,
-                    KeeperException.Code.CONNECTIONLOSS);
-        } else if (throwable instanceof InterruptedException) {
-            return new DLInterruptedException("Interrupted on operating " + path, throwable);
-        } else {
-            return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
-        }
-    }
-
-    /**
-     * Cancel the future. It would interrupt the future.
-     *
-     * @param future future to cancel
-     */
-    public static <T> void cancel(Future<T> future) {
-        future.raise(new FutureCancelledException());
-    }
-
-    /**
-     * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
-     * If the promise has been satisfied before raising, it won't change the state of the promise.
-     *
-     * @param promise promise to raise exception
-     * @param timeout timeout period
-     * @param unit timeout period unit
-     * @param cause cause to raise
-     * @param scheduler scheduler to execute raising exception
-     * @param key the submit key used by the scheduler
-     * @return the promise applied with the raise logic
-     */
-    public static <T> Promise<T> within(final Promise<T> promise,
-                                        final long timeout,
-                                        final TimeUnit unit,
-                                        final Throwable cause,
-                                        final OrderedScheduler scheduler,
-                                        final Object key) {
-        if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
-            return promise;
-        }
-        // schedule a timeout to raise timeout exception
-        final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
-            @Override
-            public void run() {
-                if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
-                    logger.info("Raise exception", cause);
-                }
-            }
-        }, timeout, unit);
-        // when the promise is satisfied, cancel the timeout task
-        promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Try<T> value) {
-                if (!task.cancel(true)) {
-                    logger.debug("Failed to cancel the timeout task");
-                }
-                return BoxedUnit.UNIT;
-            }
-        });
-        return promise;
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provide value in an ordered scheduler.
-     * <p>If the promise was already satisfied, nothing will be changed.
-     *
-     * @param promise promise to satisfy
-     * @param value value to satisfy
-     * @param scheduler scheduler to satisfy the promise with provided value
-     * @param key the submit key of the ordered scheduler
-     */
-    public static <T> void setValue(final Promise<T> promise,
-                                    final T value,
-                                    OrderedScheduler scheduler,
-                                    Object key) {
-        scheduler.submit(key, new Runnable() {
-            @Override
-            public void run() {
-                setValue(promise, value);
-            }
-        });
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provide value.
-     * <p>If the promise was already satisfied, nothing will be changed.
-     *
-     * @param promise promise to satisfy
-     * @param value value to satisfy
-     * @return true if successfully satisfy the future. false if the promise has been satisfied.
-     */
-    public static <T> boolean setValue(Promise<T> promise, T value) {
-        boolean success = promise.updateIfEmpty(new Return<T>(value));
-        if (!success) {
-            logger.info("Result set multiple times. Value = '{}', New = 'Return({})'",
-                    promise.poll(), value);
-        }
-        return success;
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler.
-     *
-     * @param promise promise to satisfy
-     * @param throwable cause to satisfy
-     * @param scheduler the scheduler to satisfy the promise
-     * @param key submit key of the ordered scheduler
-     */
-    public static <T> void setException(final Promise<T> promise,
-                                        final Throwable cause,
-                                        OrderedScheduler scheduler,
-                                        Object key) {
-        scheduler.submit(key, new Runnable() {
-            @Override
-            public void run() {
-                setException(promise, cause);
-            }
-        });
-    }
-
-    /**
-     * Satisfy the <i>promise</i> with provided <i>cause</i>.
-     *
-     * @param promise promise to satisfy
-     * @param cause cause to satisfy
-     * @return true if successfully satisfy the future. false if the promise has been satisfied.
-     */
-    public static <T> boolean setException(Promise<T> promise, Throwable cause) {
-        boolean success = promise.updateIfEmpty(new Throw<T>(cause));
-        if (!success) {
-            logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'",
-                    promise.poll(), cause);
-        }
-        return success;
-    }
-
-    /**
-     * Ignore exception from the <i>future</i>.
-     *
-     * @param future the original future
-     * @return a transformed future ignores exceptions
-     */
-    public static <T> Promise<Void> ignore(Future<T> future) {
-        return ignore(future, null);
-    }
-
-    /**
-     * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions
-     *
-     * @param future the original future
-     * @param errorMsg the error message to log on exceptions
-     * @return a transformed future ignores exceptions
-     */
-    public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) {
-        final Promise<Void> promise = new Promise<Void>();
-        future.addEventListener(new FutureEventListener<T>() {
-            @Override
-            public void onSuccess(T value) {
-                setValue(promise, null);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                if (null != errorMsg) {
-                    logger.error(errorMsg, cause);
-                }
-                setValue(promise, null);
-            }
-        });
-        return promise;
-    }
-
-    /**
-     * Create transmit exception from transmit result.
-     *
-     * @param transmitResult
-     *          transmit result (basically bk exception code)
-     * @return transmit exception
-     */
-    public static BKTransmitException transmitException(int transmitResult) {
-        return new BKTransmitException("Failed to write to bookkeeper; Error is ("
-            + transmitResult + ") "
-            + BKException.getMessage(transmitResult), transmitResult);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
deleted file mode 100644
index e06023e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-
-import com.twitter.util.FuturePool;
-import com.twitter.util.FuturePool$;
-import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import scala.runtime.BoxedUnit;
-import scala.Function0;
-
-/**
- * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding
- * the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * Stats are only exposed when <code>traceTaskExecution</code> is true.
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting
- * being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing.
- * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting.
- * <li>tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class MonitoredFuturePool implements FuturePool {
-    static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class);
-
-    private final FuturePool futurePool;
-
-    private final StatsLogger statsLogger;
-    private final OpStatsLogger taskPendingTime;
-    private final OpStatsLogger taskExecutionTime;
-    private final OpStatsLogger taskEnqueueTime;
-    private final Counter taskPendingCounter;
-
-    private final boolean traceTaskExecution;
-    private final long traceTaskExecutionWarnTimeUs;
-
-    class TimedFunction0<T> extends com.twitter.util.Function0<T> {
-        private final Function0<T> function0;
-        private Stopwatch pendingStopwatch = Stopwatch.createStarted();
-
-        TimedFunction0(Function0<T> function0) {
-            this.function0 = function0;
-            this.pendingStopwatch = Stopwatch.createStarted();
-        }
-
-        @Override
-        public T apply() {
-            taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            Stopwatch executionStopwatch = Stopwatch.createStarted();
-            T result = function0.apply();
-            taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS);
-            if (elapsed > traceTaskExecutionWarnTimeUs) {
-                LOG.info("{} took too long {} microseconds", function0.toString(), elapsed);
-            }
-            return result;
-        }
-    }
-
-    /**
-     * Create a future pool with stats exposed.
-     *
-     * @param futurePool underlying future pool to execute futures
-     * @param statsLogger stats logger to receive exposed stats
-     * @param traceTaskExecution flag to enable/disable exposing stats about task execution
-     * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks
-     *                                     whose execution time is above this value
-     */
-    public MonitoredFuturePool(FuturePool futurePool,
-                               StatsLogger statsLogger,
-                               boolean traceTaskExecution,
-                               long traceTaskExecutionWarnTimeUs) {
-        this.futurePool = futurePool;
-        this.traceTaskExecution = traceTaskExecution;
-        this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs;
-        this.statsLogger = statsLogger;
-        this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time");
-        this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time");
-        this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time");
-        this.taskPendingCounter = statsLogger.getCounter("tasks_pending");
-    }
-
-    @Override
-    public <T> Future<T> apply(Function0<T> function0) {
-        if (traceTaskExecution) {
-            taskPendingCounter.inc();
-            Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted();
-            Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0));
-            taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS));
-            futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    taskPendingCounter.dec();
-                    return null;
-                }
-            });
-            return futureResult;
-        } else {
-            return futurePool.apply(function0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
deleted file mode 100644
index 75223f2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for
- * helping understanding the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * <ul>
- * <li>pending_tasks: gauge. how many tasks are pending in this executor.
- * <li>completed_tasks: gauge. how many tasks are completed in this executor.
- * <li>total_tasks: gauge. how many tasks are submitted to this executor.
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * </ul>
- */
-public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
-    static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class);
-
-    private class TimedRunnable implements Runnable {
-
-        final Runnable runnable;
-        final long enqueueNanos;
-
-        TimedRunnable(Runnable runnable) {
-            this.runnable = runnable;
-            this.enqueueNanos = MathUtils.nowInNano();
-        }
-
-        @Override
-        public void run() {
-            long startNanos = MathUtils.nowInNano();
-            long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
-            taskPendingStats.registerSuccessfulEvent(pendingMicros);
-            try {
-                runnable.run();
-            } finally {
-                long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
-                taskExecutionStats.registerSuccessfulEvent(executionMicros);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return runnable.toString();
-        }
-
-        @Override
-        public int hashCode() {
-            return runnable.hashCode();
-        }
-    }
-
-    private class TimedCallable<T> implements Callable<T> {
-
-        final Callable<T> task;
-        final long enqueueNanos;
-
-        TimedCallable(Callable<T> task) {
-            this.task = task;
-            this.enqueueNanos = MathUtils.nowInNano();
-        }
-
-        @Override
-        public T call() throws Exception {
-            long startNanos = MathUtils.nowInNano();
-            long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
-            taskPendingStats.registerSuccessfulEvent(pendingMicros);
-            try {
-                return task.call();
-            } finally {
-                long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
-                taskExecutionStats.registerSuccessfulEvent(executionMicros);
-            }
-        }
-    }
-
-    protected final boolean traceTaskExecution;
-    protected final OpStatsLogger taskExecutionStats;
-    protected final OpStatsLogger taskPendingStats;
-    protected final StatsLogger statsLogger;
-    // Gauges and their labels
-    private static final String pendingTasksGaugeLabel = "pending_tasks";
-    private final Gauge<Number> pendingTasksGauge;
-    private static final String completedTasksGaugeLabel = "completed_tasks";
-    protected final Gauge<Number> completedTasksGauge;
-    private static final String totalTasksGaugeLabel = "total_tasks";
-    protected final Gauge<Number> totalTasksGauge;
-
-    public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
-                                                ThreadFactory threadFactory,
-                                                StatsLogger statsLogger,
-                                                boolean traceTaskExecution) {
-        super(corePoolSize, threadFactory);
-        this.traceTaskExecution = traceTaskExecution;
-        this.statsLogger = statsLogger;
-        this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
-        this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
-        this.pendingTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getQueue().size();
-            }
-        };
-        this.completedTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getCompletedTaskCount();
-            }
-        };
-        this.totalTasksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getTaskCount();
-            }
-        };
-
-        // outstanding tasks
-        this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
-        // completed tasks
-        this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
-        // total tasks
-        this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
-    }
-
-    private Runnable timedRunnable(Runnable r) {
-        return traceTaskExecution ? new TimedRunnable(r) : r;
-    }
-
-    private <T> Callable<T> timedCallable(Callable<T> task) {
-        return traceTaskExecution ? new TimedCallable<T>(task) : task;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-        return super.submit(timedRunnable(task));
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return super.submit(timedRunnable(task), result);
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return super.submit(timedCallable(task));
-    }
-
-    @Override
-    protected void afterExecute(Runnable r, Throwable t) {
-        super.afterExecute(r, t);
-        Throwable hiddenThrowable = extractThrowable(r);
-        if (hiddenThrowable != null)
-            logAndHandle(hiddenThrowable, true);
-
-        // The executor re-throws exceptions thrown by the task to the uncaught exception handler
-        // so we don't need to pass the exception to the handler explicitly
-        if (null != t) {
-            logAndHandle(t, false);
-        }
-    }
-
-    /**
-     * The executor re-throws exceptions thrown by the task to the uncaught exception handler
-     * so we only need to do anything if uncaught exception handler has not been se
-     */
-    private void logAndHandle(Throwable t, boolean passToHandler) {
-        if (Thread.getDefaultUncaughtExceptionHandler() == null) {
-            LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
-        }
-        else {
-            LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
-            if (passToHandler) {
-                Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
-            }
-        }
-    }
-
-
-    /**
-     * Extract the exception (throwable) inside the ScheduledFutureTask
-     * @param runnable - The runable that was executed
-     * @return exception enclosed in the Runnable if any; null otherwise
-     */
-    private Throwable extractThrowable(Runnable runnable) {
-        // Check for exceptions wrapped by FutureTask.
-        // We do this by calling get(), which will cause it to throw any saved exception.
-        // Check for isDone to prevent blocking
-        if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) {
-            try {
-                ((Future<?>) runnable).get();
-            } catch (CancellationException e) {
-                LOG.debug("Task {} cancelled", runnable, e.getCause());
-            } catch (InterruptedException e) {
-                LOG.debug("Task {} was interrupted", runnable, e);
-            } catch (ExecutionException e) {
-                return e.getCause();
-            }
-        }
-
-        return null;
-    }
-
-    void unregisterGauges() {
-        this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
-        this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
-        this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
-    }
-
-}