You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:27 UTC

[17/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
deleted file mode 100644
index 1b6a4a3..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.util.Random;
-import java.util.concurrent.Semaphore;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.MathUtils;
-import static com.google.common.base.Charsets.UTF_8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BookkeeperBenchmark extends AbstractBenchmark {
-
-    static final Logger logger = LoggerFactory.getLogger(BookkeeperBenchmark.class);
-
-    BookKeeper bk;
-    LedgerHandle[] lh;
-
-    public BookkeeperBenchmark(String zkHostPort) throws Exception {
-        bk = new BookKeeper(zkHostPort);
-        int numLedgers = Integer.getInteger("nLedgers",5);
-        lh = new LedgerHandle[numLedgers];
-        int quorumSize = Integer.getInteger("quorum", 2);
-        int ensembleSize = Integer.getInteger("ensemble", 4);
-        DigestType digestType = DigestType.valueOf(System.getProperty("digestType", "CRC32"));
-        for (int i=0; i< numLedgers; i++) {
-            lh[i] = bk.createLedger(ensembleSize, quorumSize, digestType, "blah".getBytes(UTF_8));
-        }
-
-    }
-
-
-    @Override
-    void doOps(final int numOps) throws Exception {
-        int size = Integer.getInteger("size", 1024);
-        byte[] msg = new byte[size];
-
-        int numOutstanding = Integer.getInteger("nPars",1000);
-        final Semaphore outstanding = new Semaphore(numOutstanding);
-
-        AddCallback callback = new AddCallback() {
-            AbstractCallback handler = new AbstractCallback(outstanding, numOps);
-
-
-            @Override
-            public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-                handler.handle(rc == BKException.Code.OK, ctx);
-            }
-
-        };
-
-
-
-        Random rand = new Random();
-
-        for (int i=0; i<numOps; i++) {
-            outstanding.acquire();
-            lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, MathUtils.now());
-        }
-
-
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        bk.close();
-    }
-
-
-    public static void main(String[] args) throws Exception {
-        BookkeeperBenchmark benchmark = new BookkeeperBenchmark(args[0]);
-        benchmark.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
deleted file mode 100644
index ba81834..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-@Sharable
-public class FakeBookie extends SimpleChannelHandler implements
-    ChannelPipelineFactory {
-    private static final Logger logger = LoggerFactory.getLogger(FakeBookie.class);
-    ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-
-    public FakeBookie(int port) {
-        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
-        ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
-
-        bootstrap.setPipelineFactory(this);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-
-        logger.info("Going into receive loop");
-        // Bind and start to accept incoming connections.
-        bootstrap.bind(new InetSocketAddress(port));
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
-        pipeline.addLast("lengthbaseddecoder",
-                         new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("main", this);
-        return pipeline;
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-            throws Exception {
-        if (!(e.getMessage() instanceof ChannelBuffer)) {
-            ctx.sendUpstream(e);
-            return;
-        }
-
-        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-
-        int type = buffer.readInt();
-        buffer.readerIndex(24);
-        long ledgerId = buffer.readLong();
-        long entryId = buffer.readLong();
-
-        ChannelBuffer outBuf = ctx.getChannel().getConfig().getBufferFactory()
-                               .getBuffer(24);
-        outBuf.writeInt(type);
-        outBuf.writeInt(0); // rc
-        outBuf.writeLong(ledgerId);
-        outBuf.writeLong(entryId);
-        e.getChannel().write(outBuf);
-
-    }
-
-
-    public static void main(String args[]) {
-        new FakeBookie(Integer.parseInt(args[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
deleted file mode 100644
index 2611bc0..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.protobuf.ByteString;
-
-public class ByteStringInterner {
-    // TODO: how to release references when strings are no longer used. weak
-    // references?
-
-    private static final ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
-
-    public static ByteString intern(ByteString in) {
-        ByteString presentValueInMap = map.putIfAbsent(in, in);
-        if (presentValueInMap != null) {
-            return presentValueInMap;
-        }
-        return in;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
deleted file mode 100644
index 237c7de..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
+++ /dev/null
@@ -1,666 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hedwig.conf.AbstractConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.HubLoad;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import com.google.protobuf.ByteString;
-
-public class ServerConfiguration extends AbstractConfiguration {
-    public final static String REGION = "region";
-    protected final static String MAX_MESSAGE_SIZE = "max_message_size";
-    protected final static String READAHEAD_COUNT = "readahead_count";
-    protected final static String READAHEAD_SIZE = "readahead_size";
-    protected final static String CACHE_SIZE = "cache_size";
-    protected final static String CACHE_ENTRY_TTL = "cache_entry_ttl";
-    protected final static String SCAN_BACKOFF_MSEC = "scan_backoff_ms";
-    protected final static String SERVER_PORT = "server_port";
-    protected final static String SSL_SERVER_PORT = "ssl_server_port";
-    protected final static String ZK_PREFIX = "zk_prefix";
-    protected final static String ZK_HOST = "zk_host";
-    protected final static String ZK_TIMEOUT = "zk_timeout";
-    protected final static String READAHEAD_ENABLED = "readahead_enabled";
-    protected final static String STANDALONE = "standalone";
-    protected final static String REGIONS = "regions";
-    protected final static String CERT_NAME = "cert_name";
-    protected final static String CERT_PATH = "cert_path";
-    protected final static String PASSWORD = "password";
-    protected final static String SSL_ENABLED = "ssl_enabled";
-    protected final static String CONSUME_INTERVAL = "consume_interval";
-    protected final static String INIT_NUM_TOPICS = "init_num_topics";
-    protected final static String MAX_NUM_TOPICS = "max_num_topics";
-    protected final static String RETENTION_SECS = "retention_secs";
-    protected final static String RETENTION_SECS_AFTER_ACCESS = "retention_secs_after_access";
-    protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
-    protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
-    protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
-    @Deprecated
-    protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
-    protected final static String BK_WRITE_QUORUM_SIZE = "bk_write_quorum_size";
-    protected final static String BK_ACK_QUORUM_SIZE = "bk_ack_quorum_size";
-    protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
-    protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
-        "default_message_window_size";
-    protected final static String NUM_READAHEAD_CACHE_THREADS = "num_readahead_cache_threads";
-    protected final static String NUM_DELIVERY_THREADS = "num_delivery_threads";
-
-    protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
-    protected final static String REBALANCE_TOLERANCE_PERCENTAGE = "rebalance_tolerance";
-    protected final static String REBALANCE_MAX_SHED = "rebalance_max_shed";
-    protected final static String REBALANCE_INTERVAL_SEC = "rebalance_interval_sec";
-
-    // manager related settings
-    protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
-    protected final static String METADATA_MANAGER_FACTORY_CLASS = "metadata_manager_factory_class";
-
-    // metastore settings, only being used when METADATA_MANAGER_FACTORY_CLASS is MsMetadataManagerFactory
-    protected final static String METASTORE_IMPL_CLASS = "metastore_impl_class";
-    protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
-
-    private static ClassLoader defaultLoader;
-    static {
-        defaultLoader = Thread.currentThread().getContextClassLoader();
-        if (null == defaultLoader) {
-            defaultLoader = ServerConfiguration.class.getClassLoader();
-        }
-    }
-
-    // these are the derived attributes
-    protected ByteString myRegionByteString = null;
-    protected HedwigSocketAddress myServerAddress = null;
-    protected List<String> regionList = null;
-
-    // Although this method is not currently used, currently maintaining it like
-    // this so that we can support on-the-fly changes in configuration
-    protected void refreshDerivedAttributes() {
-        refreshMyRegionByteString();
-        refreshMyServerAddress();
-        refreshRegionList();
-    }
-
-    @Override
-    public void loadConf(URL confURL) throws ConfigurationException {
-        super.loadConf(confURL);
-        refreshDerivedAttributes();
-    }
-
-    public int getMaximumMessageSize() {
-        return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
-    }
-
-    public String getMyRegion() {
-        return conf.getString(REGION, "standalone");
-    }
-
-    protected void refreshMyRegionByteString() {
-        myRegionByteString = ByteString.copyFromUtf8(getMyRegion());
-    }
-
-    protected void refreshMyServerAddress() {
-        try {
-            // Use the raw IP address as the hostname
-            myServerAddress = new HedwigSocketAddress(InetAddress.getLocalHost().getHostAddress(), getServerPort(),
-                    getSSLServerPort());
-        } catch (UnknownHostException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    // The expected format for the regions parameter is Hostname:Port:SSLPort
-    // with spaces in between each of the regions.
-    protected void refreshRegionList() {
-        String regions = conf.getString(REGIONS, "");
-        if (regions.isEmpty()) {
-            regionList = new LinkedList<String>();
-        } else {
-            regionList = Arrays.asList(regions.split(" "));
-        }
-    }
-
-    public ByteString getMyRegionByteString() {
-        if (myRegionByteString == null) {
-            refreshMyRegionByteString();
-        }
-        return myRegionByteString;
-    }
-
-    /**
-     * Maximum number of messages to read ahead. Default is 10.
-     *
-     * @return int
-     */
-    public int getReadAheadCount() {
-        return conf.getInt(READAHEAD_COUNT, 10);
-    }
-
-    /**
-     * Maximum number of bytes to read ahead. Default is 4MB.
-     *
-     * @return long
-     */
-    public long getReadAheadSizeBytes() {
-        return conf.getLong(READAHEAD_SIZE, 4 * 1024 * 1024); // 4M
-    }
-
-    /**
-     * Maximum cache size. By default is the smallest of 2G or
-     * half the heap size.
-     *
-     * @return long
-     */
-    public long getMaximumCacheSize() {
-        // 2G or half of the maximum amount of memory the JVM uses
-        return conf.getLong(CACHE_SIZE, Math.min(2 * 1024L * 1024L * 1024L, Runtime.getRuntime().maxMemory() / 2));
-    }
-
-    /**
-     * Cache Entry TTL. By default is 0, cache entry will not be evicted
-     * until the cache is fullfilled or the messages are already consumed.
-     * The TTL is only checked when trying adding a new entry into the cache.
-     *
-     * @return cache entry ttl.
-     */
-    public long getCacheEntryTTL() {
-        return conf.getLong(CACHE_ENTRY_TTL, 0L);
-    }
-
-    /**
-     * After a scan of a log fails, how long before we retry (in msec)
-     *
-     * @return long
-     */
-    public long getScanBackoffPeriodMs() {
-        return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
-    }
-
-    /**
-     * Returns server port.
-     *
-     * @return int
-     */
-    public int getServerPort() {
-        return conf.getInt(SERVER_PORT, 4080);
-    }
-
-    /**
-     * Returns SSL server port.
-     *
-     * @return int
-     */
-    public int getSSLServerPort() {
-        return conf.getInt(SSL_SERVER_PORT, 9876);
-    }
-
-    /**
-     * Returns ZooKeeper path prefix.
-     *
-     * @return string
-     */
-    public String getZkPrefix() {
-        return conf.getString(ZK_PREFIX, "/hedwig");
-    }
-
-    public StringBuilder getZkRegionPrefix(StringBuilder sb) {
-        return sb.append(getZkPrefix()).append("/").append(getMyRegion());
-    }
-
-    /**
-     * Get znode path to store manager layouts.
-     *
-     * @param sb
-     *          StringBuilder to store znode path to store manager layouts.
-     * @return znode path to store manager layouts.
-     */
-    public StringBuilder getZkManagersPrefix(StringBuilder sb) {
-        return getZkRegionPrefix(sb).append("/managers");
-    }
-
-    public StringBuilder getZkTopicsPrefix(StringBuilder sb) {
-        return getZkRegionPrefix(sb).append("/topics");
-    }
-
-    public StringBuilder getZkTopicPath(StringBuilder sb, ByteString topic) {
-        return getZkTopicsPrefix(sb).append("/").append(topic.toStringUtf8());
-    }
-
-    public StringBuilder getZkHostsPrefix(StringBuilder sb) {
-        return getZkRegionPrefix(sb).append("/hosts");
-    }
-
-    public HedwigSocketAddress getServerAddr() {
-        if (myServerAddress == null) {
-            refreshMyServerAddress();
-        }
-        return myServerAddress;
-    }
-
-    /**
-     * Return ZooKeeper list of servers. Default is localhost.
-     *
-     * @return String
-     */
-    public String getZkHost() {
-        List servers = conf.getList(ZK_HOST, null);
-        if (null == servers || 0 == servers.size()) {
-            return "localhost";
-        }
-        return StringUtils.join(servers, ",");
-    }
-
-    /**
-     * Return ZooKeeper session timeout. Default is 2s.
-     *
-     * @return int
-     */
-    public int getZkTimeout() {
-        return conf.getInt(ZK_TIMEOUT, 2000);
-    }
-
-    /**
-     * Returns true if read-ahead enabled. Default is true.
-     *
-     * @return boolean
-     */
-    public boolean getReadAheadEnabled() {
-        return conf.getBoolean(READAHEAD_ENABLED, true)
-            || conf.getBoolean("readhead_enabled");
-        // the key was misspelt in a previous version, so compensate here
-    }
-
-    /**
-     * Returns true if standalone. Default is false.
-     *
-     * @return boolean
-     */
-    public boolean isStandalone() {
-        return conf.getBoolean(STANDALONE, false);
-    }
-
-    /**
-     * Returns list of regions.
-     *
-     * @return List<String>
-     */
-    public List<String> getRegions() {
-        if (regionList == null) {
-            refreshRegionList();
-        }
-        return regionList;
-    }
-
-    /**
-     *  Returns the name of the SSL certificate if available as a resource.
-     *
-     * @return String
-     */
-    public String getCertName() {
-        return conf.getString(CERT_NAME, "");
-    }
-
-    /**
-     * This is the path to the SSL certificate if it is available as a file.
-     *
-     * @return String
-     */
-    public String getCertPath() {
-        return conf.getString(CERT_PATH, "");
-    }
-
-    // This method return the SSL certificate as an InputStream based on if it
-    // is configured to be available as a resource or as a file. If nothing is
-    // configured correctly, then a ConfigurationException will be thrown as
-    // we do not know how to obtain the SSL certificate stream.
-    public InputStream getCertStream() throws FileNotFoundException, ConfigurationException {
-        String certName = getCertName();
-        String certPath = getCertPath();
-        if (certName != null && !certName.isEmpty()) {
-            return getClass().getResourceAsStream(certName);
-        } else if (certPath != null && !certPath.isEmpty()) {
-            return new FileInputStream(certPath);
-        } else
-            throw new ConfigurationException("SSL Certificate configuration does not have resource name or path set!");
-    }
-
-    /**
-     * Returns the password used for BookKeeper ledgers. Default
-     * is the empty string.
-     *
-     * @return
-     */
-    public String getPassword() {
-        return conf.getString(PASSWORD, "");
-    }
-
-    /**
-     * Returns true if SSL is enabled. Default is false.
-     *
-     * @return boolean
-     */
-    public boolean isSSLEnabled() {
-        return conf.getBoolean(SSL_ENABLED, false);
-    }
-
-    /**
-     * Gets the number of messages consumed before persisting
-     * information about consumed messages. A value greater than
-     * one avoids persisting information about consumed messages
-     * upon every consumed message. Default is 50.
-     *
-     * @return int
-     */
-    public int getConsumeInterval() {
-        return conf.getInt(CONSUME_INTERVAL, 50);
-    }
-
-    /**
-     * Returns the interval to release a topic. If this
-     * parameter is greater than zero, then schedule a
-     * task to release an owned topic. Default is 0 (never released).
-     *
-     * @return int
-     */
-    public int getRetentionSecs() {
-        return conf.getInt(RETENTION_SECS, 0);
-    }
-
-    /**
-     * Specifies that the topic should be automatically released
-     * once a fixed duration after the topic is owned, a message is
-     * published, or a message is delivered.
-     *
-     * @return the length of time after an entry is last accessed that
-     *         it should be automatically removed.
-     */
-    public int getRetentionSecsAfterAccess() {
-        return conf.getInt(RETENTION_SECS_AFTER_ACCESS, 0);
-    }
-
-    /**
-     * Max number of topics for a hub server to serve.
-     *
-     * @return max number of topics for a hub server to serve.
-     */
-    public int getMaxNumTopics() {
-        return conf.getInt(MAX_NUM_TOPICS, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Minimum size of internal structure to store topics.
-     *
-     * @return init number of topics for a hub server.
-     */
-    public int getInitNumTopics() {
-        return conf.getInt(INIT_NUM_TOPICS, 128);
-    }
-
-    /**
-     * True if SSL is enabled across regions.
-     *
-     * @return boolean
-     */
-    public boolean isInterRegionSSLEnabled() {
-        return conf.getBoolean(INTER_REGION_SSL_ENABLED, false);
-    }
-
-    /**
-     * This parameter is used to determine how often we run the
-     * SubscriptionManager's Messages Consumed timer task thread
-     * (in milliseconds).
-     *
-     * @return int
-     */
-    public int getMessagesConsumedThreadRunInterval() {
-        return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
-    }
-
-    /**
-     * This parameter is used to determine how often we run a thread
-     * to retry those failed remote subscriptions in asynchronous mode
-     * (in milliseconds).
-     *
-     * @return int
-     */
-    public int getRetryRemoteSubscribeThreadRunInterval() {
-        return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
-    }
-
-    /**
-     * This parameter is for setting the default maximum number of messages which
-     * can be delivered to a subscriber without being consumed.
-     * we pause messages delivery to a subscriber when reaching the window size
-     *
-     * @return int
-     */
-    public int getDefaultMessageWindowSize() {
-        return conf.getInt(DEFAULT_MESSAGE_WINDOW_SIZE, 0);
-    }
-
-    /**
-     * This parameter is used when Bookkeeper is the persistence
-     * store and indicates what the ensemble size is (i.e. how
-     * many bookie servers to stripe the ledger entries across).
-     *
-     * @return int
-     */
-    public int getBkEnsembleSize() {
-        return conf.getInt(BK_ENSEMBLE_SIZE, 3);
-    }
-
-
-    /**
-     * This parameter is used when Bookkeeper is the persistence store
-     * and indicates what the quorum size is (i.e. how many redundant
-     * copies of each ledger entry is written).
-     *
-     * @return int
-     * @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
-     */
-    @Deprecated
-    protected int getBkQuorumSize() {
-        return conf.getInt(BK_QUORUM_SIZE, 2);
-    }
-
-    /**
-     * Get the write quorum size for BookKeeper client, which is used to
-     * indicate how many redundant copies of each ledger entry is written.
-     *
-     * @return write quorum size for BookKeeper client.
-     */
-    public int getBkWriteQuorumSize() {
-        if (conf.containsKey(BK_WRITE_QUORUM_SIZE)) {
-            return conf.getInt(BK_WRITE_QUORUM_SIZE, 2);
-        } else {
-            return getBkQuorumSize();
-        }
-    }
-
-    /**
-     * Get the ack quorum size for BookKeeper client.
-     *
-     * @return ack quorum size for BookKeeper client.
-     */
-    public int getBkAckQuorumSize() {
-        if (conf.containsKey(BK_ACK_QUORUM_SIZE)) {
-            return conf.getInt(BK_ACK_QUORUM_SIZE, 2);
-        } else {
-            return getBkQuorumSize();
-        }
-    }
-
-    /**
-     * This parameter is used when BookKeeper is the persistence storage,
-     * and indicates when the number of entries stored in a ledger reach
-     * the threshold, hub server will open a new ledger to write.
-     *
-     * @return max entries per ledger
-     */
-    public long getMaxEntriesPerLedger() {
-        return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L);
-    }
-
-    /**
-     * Get the tolerance percentage for the rebalancer. The rebalancer will not
-     * shed load if it's current load is less than average + average*tolerancePercentage/100.0
-     *
-     * @return the tolerance percentage for the rebalancer.
-     */
-    public double getRebalanceTolerance() {
-        return conf.getDouble(REBALANCE_TOLERANCE_PERCENTAGE, 10.0);
-    }
-
-    /**
-     * Get the maximum load the rebalancer can shed at once. Default is 50.
-     * @return
-     */
-    public HubLoad getRebalanceMaxShed() {
-        return new HubLoad(conf.getLong(REBALANCE_MAX_SHED, 50));
-    }
-
-    /**
-     * Get the interval(in seconds) between rebalancing attempts. The default is
-     * 5 minutes.
-     * @return
-     */
-    public long getRebalanceInterval() {
-        return conf.getLong(REBALANCE_INTERVAL_SEC, 300);
-    }
-
-    /*
-     * Is this a valid configuration that we can run with? This code might grow
-     * over time.
-     */
-    public void validate() throws ConfigurationException {
-        if (!getZkPrefix().startsWith("/")) {
-            throw new ConfigurationException(ZK_PREFIX + " must start with a /");
-        }
-        // Validate that if Regions exist and inter-region communication is SSL
-        // enabled, that the Regions correspond to valid HedwigSocketAddresses,
-        // namely that SSL ports are present.
-        if (isInterRegionSSLEnabled() && getRegions().size() > 0) {
-            for (String hubString : getRegions()) {
-                HedwigSocketAddress hub = new HedwigSocketAddress(hubString);
-                if (hub.getSSLSocketAddress() == null)
-                    throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
-            }
-        }
-        // Validate that the Bookkeeper ensemble size >= quorum size.
-        if (getBkEnsembleSize() < getBkWriteQuorumSize()) {
-            throw new ConfigurationException("BK ensemble size (" + getBkEnsembleSize()
-                                             + ") is less than the write quorum size (" + getBkWriteQuorumSize() + ")");
-        }
-
-        if (getBkWriteQuorumSize() < getBkAckQuorumSize()) {
-            throw new ConfigurationException("BK write quorum size (" + getBkWriteQuorumSize()
-                                             + ") is less than the ack quorum size (" + getBkAckQuorumSize() + ")");
-        }
-        // Validate that the rebalance tolerance percentage is not negative.
-        if (getRebalanceTolerance() < 0.0) {
-            throw new ConfigurationException("The rebalance tolerance percentage cannot be negative.");
-        }
-        // Validate that the maximum load to shed during a rebalance is not negative.
-        if (getRebalanceMaxShed().getNumTopics() < 0L) {
-            throw new ConfigurationException("The maximum load to shed during a rebalance cannot be negative.");
-        }
-        // add other checks here
-    }
-
-    /**
-     * Get number of read ahead cache threads.
-     *
-     * @return number of read ahead cache threads.
-     */
-    public int getNumReadAheadCacheThreads() {
-        return conf.getInt(NUM_READAHEAD_CACHE_THREADS, Runtime.getRuntime().availableProcessors());
-    }
-
-    /**
-     * Get number of delivery threads
-     *
-     * @return number of delivery threads.
-     */
-    public int getNumDeliveryThreads() {
-        return conf.getInt(NUM_DELIVERY_THREADS, Runtime.getRuntime().availableProcessors());
-    }
-
-    /**
-     * Whether enable metadata manager based topic manager.
-     *
-     * @return true if enabled metadata manager based topic manager.
-     */
-    public boolean isMetadataManagerBasedTopicManagerEnabled() {
-        return conf.getBoolean(METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED, false);
-    }
-
-    /**
-     * Get metadata manager factory class.
-     *
-     * @return manager class
-     */
-    public Class<? extends MetadataManagerFactory> getMetadataManagerFactoryClass()
-    throws ConfigurationException {
-        return ReflectionUtils.getClass(conf, METADATA_MANAGER_FACTORY_CLASS,
-                                        null, MetadataManagerFactory.class,
-                                        defaultLoader);
-    }
-
-    /**
-     * Set metadata manager factory class name
-     *
-     * @param managerClsName
-     *          Manager Class Name
-     * @return server configuration
-     */
-    public ServerConfiguration setMetadataManagerFactoryName(String managerClsName) {
-        conf.setProperty(METADATA_MANAGER_FACTORY_CLASS, managerClsName);
-        return this;
-    }
-
-    /**
-     * Get metastore implementation class.
-     *
-     * @return metastore implementation class name.
-     */
-    public String getMetastoreImplClass() {
-        return conf.getString(METASTORE_IMPL_CLASS);
-    }
-
-    /**
-     * Get max entries per scan in metastore.
-     *
-     * @return max entries per scan in metastore.
-     */
-    public int getMetastoreMaxEntriesPerScan() {
-        return conf.getInt(METASTORE_MAX_ENTRIES_PER_SCAN, 50);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
deleted file mode 100644
index fd8234f..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TerminateJVMExceptionHandler implements Thread.UncaughtExceptionHandler {
-    private static Logger logger = LoggerFactory.getLogger(TerminateJVMExceptionHandler.class);
-
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-        logger.error("Uncaught exception in thread " + t.getName(), e);
-        Runtime.getRuntime().exit(1);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
deleted file mode 100644
index 3c4a562..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/TopicOpQueuer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-public class TopicOpQueuer {
-    /**
-     * Map from topic to the queue of operations for that topic.
-     */
-    protected HashMap<ByteString, Queue<Runnable>> topic2ops = new HashMap<ByteString, Queue<Runnable>>();
-
-    protected final ScheduledExecutorService scheduler;
-
-    public TopicOpQueuer(ScheduledExecutorService scheduler) {
-        this.scheduler = scheduler;
-    }
-
-    public interface Op extends Runnable {
-    }
-
-    public abstract class AsynchronousOp<T> implements Op {
-        final public ByteString topic;
-        final public Callback<T> cb;
-        final public Object ctx;
-
-        public AsynchronousOp(final ByteString topic, final Callback<T> cb, Object ctx) {
-            this.topic = topic;
-            this.cb = new Callback<T>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, exception);
-                    popAndRunNext(topic);
-                }
-
-                @Override
-                public void operationFinished(Object ctx, T resultOfOperation) {
-                    cb.operationFinished(ctx, resultOfOperation);
-                    popAndRunNext(topic);
-                }
-            };
-            this.ctx = ctx;
-        }
-    }
-
-    public abstract class SynchronousOp implements Op {
-        final public ByteString topic;
-
-        public SynchronousOp(ByteString topic) {
-            this.topic = topic;
-        }
-
-        @Override
-        public final void run() {
-            runInternal();
-            popAndRunNext(topic);
-        }
-
-        protected abstract void runInternal();
-
-    }
-
-    protected synchronized void popAndRunNext(ByteString topic) {
-        Queue<Runnable> ops = topic2ops.get(topic);
-        if (!ops.isEmpty())
-            ops.remove();
-        if (!ops.isEmpty())
-            scheduler.submit(ops.peek());
-    }
-
-    public void pushAndMaybeRun(ByteString topic, Op op) {
-        int size;
-        synchronized (this) {
-            Queue<Runnable> ops = topic2ops.get(topic);
-            if (ops == null) {
-                ops = new LinkedList<Runnable>();
-                topic2ops.put(topic, ops);
-            }
-            ops.add(op);
-            size = ops.size();
-        }
-        if (size == 1)
-            op.run();
-    }
-
-    public Runnable peek(ByteString topic) {
-        return topic2ops.get(topic).peek();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java b/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
deleted file mode 100644
index 364ffdc..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/common/UnexpectedError.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.common;
-
-public class UnexpectedError extends Error {
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-
-    public UnexpectedError(String msg) {
-        super(msg);
-    }
-
-    public UnexpectedError(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
deleted file mode 100644
index e7dc1fa..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.server.common.UnexpectedError;
-
-public class ChannelEndPoint implements DeliveryEndPoint, ChannelFutureListener {
-
-    Channel channel;
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    Map<ChannelFuture, DeliveryCallback> callbacks = new HashMap<ChannelFuture, DeliveryCallback>();
-
-    public ChannelEndPoint(Channel channel) {
-        this.channel = channel;
-    }
-
-    public void close() {
-        channel.close();
-    }
-
-    public void send(PubSubResponse response, DeliveryCallback callback) {
-        ChannelFuture future = channel.write(response);
-        callbacks.put(future, callback);
-        future.addListener(this);
-    }
-
-    public void operationComplete(ChannelFuture future) throws Exception {
-        DeliveryCallback callback = callbacks.get(future);
-        callbacks.remove(future);
-
-        if (callback == null) {
-            throw new UnexpectedError("Could not locate callback for channel future");
-        }
-
-        if (future.isSuccess()) {
-            callback.sendingFinished();
-        } else {
-            // treat all channel errors as permanent
-            callback.permanentErrorOnSend();
-        }
-
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof ChannelEndPoint) {
-            ChannelEndPoint channelEndPoint = (ChannelEndPoint) obj;
-            return channel.equals(channelEndPoint.channel);
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return channel.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return channel.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
deleted file mode 100644
index 9ee63f1..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-public interface DeliveryCallback {
-
-    public void sendingFinished();
-
-    public void transientErrorOnSend();
-
-    public void permanentErrorOnSend();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
deleted file mode 100644
index 0774801..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryEndPoint.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public interface DeliveryEndPoint {
-
-    public void send(PubSubResponse response, DeliveryCallback callback);
-
-    public void close();
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
deleted file mode 100644
index af3d150..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.util.Callback;
-
-public interface DeliveryManager {
-    public void start();
-
-    /**
-     * Start serving a given subscription.
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber Id
-     * @param preferences
-     *          Subscription Preferences
-     * @param seqIdToStartFrom
-     *          Message sequence id starting delivery from.
-     * @param endPoint
-     *          End point to deliver messages to.
-     * @param filter
-     *          Message filter used to filter messages before delivery.
-     * @param callback
-     *          Callback instance.
-     * @param ctx
-     *          Callback context.
-     */
-    public void startServingSubscription(ByteString topic, ByteString subscriberId,
-                                         SubscriptionPreferences preferences,
-                                         MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint,
-                                         ServerMessageFilter filter,
-                                         Callback<Void> callback, Object ctx);
-
-    /**
-     * Stop serving a given subscription.
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber Id
-     * @param event
-     *          Subscription event indicating the reason to stop the subscriber.
-     * @param callback
-     *          Callback instance.
-     * @param ctx
-     *          Callback context.
-     */
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
-                                      SubscriptionEvent event,
-                                      Callback<Void> callback, Object ctx);
-
-    /**
-     * Tell the delivery manager where that a subscriber has consumed
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber Id
-     * @param consumedSeqId
-     *          Max consumed seq id.
-     */
-    public void messageConsumed(ByteString topic, ByteString subscriberId,
-                                MessageSeqId consumedSeqId);
-
-    /**
-     * Stop delivery manager
-     */
-    public void stop();
-}