You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:25 UTC

[15/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
deleted file mode 100644
index 3838e96..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class UnsubscribeHandler extends BaseHandler {
-    SubscriptionManager subMgr;
-    DeliveryManager deliveryMgr;
-    SubscriptionChannelManager subChannelMgr;
-    // op stats
-    final OpStats unsubStats;
-
-    public UnsubscribeHandler(ServerConfiguration cfg,
-                              TopicManager tm,
-                              SubscriptionManager subMgr,
-                              DeliveryManager deliveryMgr,
-                              SubscriptionChannelManager subChannelMgr) {
-        super(tm, cfg);
-        this.subMgr = subMgr;
-        this.deliveryMgr = deliveryMgr;
-        this.subChannelMgr = subChannelMgr;
-        unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE);
-    }
-
-    @Override
-    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
-        if (!request.hasUnsubscribeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing unsubscribe request data");
-            unsubStats.incrementFailedOps();
-            return;
-        }
-
-        final UnsubscribeRequest unsubRequest = request.getUnsubscribeRequest();
-        final ByteString topic = request.getTopic();
-        final ByteString subscriberId = unsubRequest.getSubscriberId();
-
-        final long requestTime = MathUtils.now();
-        subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                unsubStats.incrementFailedOps();
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                // we should not close the channel in delivery manager
-                // since client waits the response for closeSubscription request
-                // client side would close the channel
-                deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
-                new Callback<Void>() {
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                        unsubStats.incrementFailedOps();
-                    }
-                    @Override
-                    public void operationFinished(Object ctx, Void resultOfOperation) {
-                        // remove the topic subscription from subscription channels
-                        subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
-                                             channel);
-                        channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                        unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
-                    }
-                }, ctx);
-            }
-        }, null);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
deleted file mode 100644
index f0081d9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-/**
- * An implementor of this interface is basiclly responsible for jmx beans.
- */
-public interface HedwigJMXService {
-    /**
-     * register jmx
-     *
-     * @param parent
-     *          Parent JMX Bean
-     */
-    public void registerJMX(HedwigMBeanInfo parent);
-
-    /**
-     * unregister jmx
-     */
-    public void unregisterJMX();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
deleted file mode 100644
index 866a217..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-import org.apache.zookeeper.jmx.ZKMBeanInfo;
-
-/**
- * Hedwig MBean info interface.
- */
-public interface HedwigMBeanInfo extends ZKMBeanInfo {
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
deleted file mode 100644
index 563cae8..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.jmx;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.bookkeeper.jmx.BKMBeanRegistry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides a unified interface for registering/unregistering of
- * Hedwig MBeans with the platform MBean server.
- */
-public class HedwigMBeanRegistry extends BKMBeanRegistry {
-
-    static final String SERVICE = "org.apache.HedwigServer";
-
-    static HedwigMBeanRegistry instance = new HedwigMBeanRegistry();
-
-    public static HedwigMBeanRegistry getInstance(){
-        return instance;
-    }
-
-    @Override
-    protected String getDomainName() {
-        return SERVICE;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
deleted file mode 100644
index a4253f8..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package org.apache.hedwig.server.meta;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * This class encapsulates metadata manager layout information
- * that is persistently stored in zookeeper.
- * It provides parsing and serialization methods of such information.
- *
- */
-public class FactoryLayout {
-    private static final Logger logger = LoggerFactory.getLogger(FactoryLayout.class);
-
-    // metadata manager name
-    public static final String NAME = "METADATA";
-    // Znode name to store layout information
-    public static final String LAYOUT_ZNODE = "LAYOUT";
-    public static final String LSEP = "\n";
-
-    private ManagerMeta managerMeta;
-
-    /**
-     * Construct metadata manager factory layout.
-     *
-     * @param meta
-     *          Meta describes what kind of factory used.
-     */
-    public FactoryLayout(ManagerMeta meta) {
-        this.managerMeta = meta;
-    }
-
-    public static String getFactoryLayoutPath(StringBuilder sb, ServerConfiguration cfg) {
-        return cfg.getZkManagersPrefix(sb).append("/").append(NAME)
-               .append("/").append(LAYOUT_ZNODE).toString();
-    }
-
-    public ManagerMeta getManagerMeta() {
-        return managerMeta;
-    }
-
-    /**
-     * Store the factory layout into zookeeper
-     *
-     * @param zk
-     *          ZooKeeper Handle
-     * @param cfg
-     *          Server Configuration Object
-     * @throws KeeperException
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void store(ZooKeeper zk, ServerConfiguration cfg)
-    throws KeeperException, IOException, InterruptedException {
-        String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
-
-        byte[] layoutData = TextFormat.printToString(managerMeta).getBytes(UTF_8);
-        ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, layoutData,
-                                         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-
-    @Override
-    public int hashCode() {
-        return managerMeta.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (null == o ||
-            !(o instanceof FactoryLayout)) {
-            return false;
-        }
-        FactoryLayout other = (FactoryLayout)o;
-        return managerMeta.equals(other.managerMeta);
-    }
-
-    @Override
-    public String toString() {
-        return TextFormat.printToString(managerMeta);
-    }
-
-    /**
-     * Read factory layout from zookeeper
-     *
-     * @param zk
-     *          ZooKeeper Client
-     * @param cfg
-     *          Server configuration object
-     * @return Factory layout, or null if none set in zookeeper
-     */
-    public static FactoryLayout readLayout(final ZooKeeper zk,
-                                           final ServerConfiguration cfg)
-    throws IOException, KeeperException {
-        String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
-        byte[] layoutData;
-        try {
-            layoutData = zk.getData(factoryLayoutPath, false, null);
-        } catch (KeeperException.NoNodeException nne) {
-            return null;
-        } catch (InterruptedException ie) {
-            throw new IOException(ie);
-        }
-        ManagerMeta meta;
-        try {
-            BufferedReader reader = new BufferedReader(
-                    new StringReader(new String(layoutData, UTF_8)));
-            ManagerMeta.Builder metaBuilder = ManagerMeta.newBuilder();
-            TextFormat.merge(reader, metaBuilder);
-            meta = metaBuilder.build();
-        } catch (InvalidProtocolBufferException ipbe) {
-            throw new IOException("Corrupted factory layout : ", ipbe);
-        }
-
-        return new FactoryLayout(meta);
-    }
-
-    /**
-     * Remove the factory layout from ZooKeeper.
-     *
-     * @param zk
-     *          ZooKeeper instance
-     * @param cfg
-     *          Server configuration object
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public static void deleteLayout(ZooKeeper zk, ServerConfiguration cfg)
-            throws KeeperException, InterruptedException {
-        String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
-        zk.delete(factoryLayoutPath, -1);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
deleted file mode 100644
index 129d03d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Metadata Manager used to manage metadata used by hedwig.
- */
-public abstract class MetadataManagerFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactory.class);
-
-    /**
-     * Return current factory version.
-     *
-     * @return current version used by factory.
-     */
-    public abstract int getCurrentVersion();
-
-    /**
-     * Initialize the metadata manager factory with given
-     * configuration and version.
-     *
-     * @param cfg
-     *          Server configuration object
-     * @param zk
-     *          ZooKeeper handler
-     * @param version
-     *          Manager version
-     * @return metadata manager factory
-     * @throws IOException when fail to initialize the manager.
-     */
-    protected abstract MetadataManagerFactory initialize(
-        ServerConfiguration cfg, ZooKeeper zk, int version)
-    throws IOException;
-
-    /**
-     * Uninitialize the factory.
-     *
-     * @throws IOException when fail to shutdown the factory.
-     */
-    public abstract void shutdown() throws IOException;
-
-    /**
-     * Iterate over the topics list.
-     * Used by HedwigConsole to list available topics.
-     *
-     * @return iterator of the topics list.
-     * @throws IOException
-     */
-    public abstract Iterator<ByteString> getTopics() throws IOException;
-
-    /**
-     * Create topic persistence manager.
-     *
-     * @return topic persistence manager
-     */
-    public abstract TopicPersistenceManager newTopicPersistenceManager();
-
-    /**
-     * Create subscription data manager.
-     *
-     * @return subscription data manager.
-     */
-    public abstract SubscriptionDataManager newSubscriptionDataManager();
-
-    /**
-     * Create topic ownership manager.
-     *
-     * @return topic ownership manager.
-     */
-    public abstract TopicOwnershipManager newTopicOwnershipManager();
-
-    /**
-     * Format the metadata for Hedwig.
-     *
-     * @param cfg
-     *          Configuration instance
-     * @param zk
-     *          ZooKeeper instance
-     */
-    public abstract void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException;
-
-    /**
-     * Create new Metadata Manager Factory.
-     *
-     * @param conf
-     *          Configuration Object.
-     * @param zk
-     *          ZooKeeper Client Handle, talk to zk to know which manager factory is used.
-     * @return new manager factory.
-     * @throws IOException
-     */
-    public static MetadataManagerFactory newMetadataManagerFactory(
-        final ServerConfiguration conf, final ZooKeeper zk)
-    throws IOException, KeeperException, InterruptedException {
-        Class<? extends MetadataManagerFactory> factoryClass;
-        try {
-            factoryClass = conf.getMetadataManagerFactoryClass();
-        } catch (Exception e) {
-            throw new IOException("Failed to get metadata manager factory class from configuration : ", e);
-        }
-        // check that the configured manager is
-        // compatible with the existing layout
-        FactoryLayout layout = FactoryLayout.readLayout(zk, conf);
-        if (layout == null) { // no existing layout
-            return createMetadataManagerFactory(conf, zk, factoryClass);
-        }
-        LOG.debug("read meta layout {}", layout);
-
-        if (factoryClass != null &&
-            !layout.getManagerMeta().getManagerImpl().equals(factoryClass.getName())) {
-            throw new IOException("Configured metadata manager factory " + factoryClass.getName()
-                                + " does not match existing factory "  + layout.getManagerMeta().getManagerImpl());
-        }
-        if (factoryClass == null) {
-            // no factory specified in configuration
-            String factoryClsName = layout.getManagerMeta().getManagerImpl();
-            try {
-                Class<?> theCls = Class.forName(factoryClsName);
-                if (!MetadataManagerFactory.class.isAssignableFrom(theCls)) {
-                    throw new IOException("Wrong metadata manager factory " + factoryClsName);
-                }
-                factoryClass = theCls.asSubclass(MetadataManagerFactory.class);
-            } catch (ClassNotFoundException cnfe) {
-                throw new IOException("No class found to instantiate metadata manager factory " + factoryClsName);
-            }
-        }
-        // instantiate the metadata manager factory
-        MetadataManagerFactory managerFactory;
-        try {
-            managerFactory = ReflectionUtils.newInstance(factoryClass);
-        } catch (Throwable t) {
-            throw new IOException("Failed to instantiate metadata manager factory : " + factoryClass, t);
-        }
-        return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion());
-    }
-
-    /**
-     * Create metadata manager factory and write factory layout to ZooKeeper.
-     *
-     * @param cfg
-     *          Server Configuration object.
-     * @param zk
-     *          ZooKeeper instance.
-     * @param factoryClass
-     *          Metadata Manager Factory Class.
-     * @return metadata manager factory instance.
-     * @throws IOException
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public static MetadataManagerFactory createMetadataManagerFactory(
-            ServerConfiguration cfg, ZooKeeper zk,
-            Class<? extends MetadataManagerFactory> factoryClass)
-            throws IOException, KeeperException, InterruptedException {
-        // use default manager if no one provided
-        if (factoryClass == null) {
-            factoryClass = ZkMetadataManagerFactory.class;
-        }
-
-        MetadataManagerFactory managerFactory;
-        try {
-            managerFactory = ReflectionUtils.newInstance(factoryClass);
-        } catch (Throwable t) {
-            throw new IOException("Fail to instantiate metadata manager factory : " + factoryClass, t);
-        }
-        ManagerMeta managerMeta = ManagerMeta.newBuilder()
-                                  .setManagerImpl(factoryClass.getName())
-                                  .setManagerVersion(managerFactory.getCurrentVersion())
-                                  .build();
-        FactoryLayout layout = new FactoryLayout(managerMeta);
-        try {
-            layout.store(zk, cfg);
-        } catch (KeeperException.NodeExistsException nee) {
-            FactoryLayout layout2 = FactoryLayout.readLayout(zk, cfg);
-            if (!layout2.equals(layout)) {
-                throw new IOException("Contention writing to layout to zookeeper, "
-                        + " other layout " + layout2 + " is incompatible with our "
-                        + "layout " + layout);
-            }
-        }
-        return managerFactory.initialize(cfg, zk, layout.getManagerMeta().getManagerVersion());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
deleted file mode 100644
index b44ca91..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.metastore.MetaStore;
-import org.apache.bookkeeper.metastore.MetastoreCallback;
-import org.apache.bookkeeper.metastore.MetastoreCursor;
-import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
-import org.apache.bookkeeper.metastore.MetastoreException;
-import org.apache.bookkeeper.metastore.MetastoreFactory;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
-import org.apache.bookkeeper.metastore.MetastoreTable;
-import org.apache.bookkeeper.metastore.MetastoreUtils;
-
-import static org.apache.bookkeeper.metastore.MetastoreTable.*;
-import org.apache.bookkeeper.metastore.MetastoreTableItem;
-import org.apache.bookkeeper.metastore.MSException;
-import org.apache.bookkeeper.metastore.Value;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.zookeeper.ZooKeeper;
-
-/**
- * MetadataManagerFactory for plug-in metadata storage.
- */
-public class MsMetadataManagerFactory extends MetadataManagerFactory {
-    protected final static Logger logger = LoggerFactory.getLogger(MsMetadataManagerFactory.class);
-
-    static final String UTF8 = "UTF-8";
-
-    static final int CUR_VERSION = 1;
-
-    static final String OWNER_TABLE_NAME = "owner";
-    static final String PERSIST_TABLE_NAME = "persist";
-    static final String SUB_TABLE_NAME = "sub";
-
-    static class SyncResult<T> {
-        T value;
-        int rc;
-        boolean finished = false;
-
-        public synchronized void complete(int rc, T value) {
-            this.rc = rc;
-            this.value = value;
-            finished = true;
-
-            notify();
-        }
-
-        public synchronized void block() throws InterruptedException {
-            while (!finished) {
-                wait();
-            }
-        }
-
-        public int getReturnCode() {
-            return rc;
-        }
-
-        public T getValue() {
-            return value;
-        }
-    }
-
-    MetaStore metastore;
-    MetastoreTable ownerTable;
-    MetastoreTable persistTable;
-    MetastoreScannableTable subTable;
-    ServerConfiguration cfg;
-
-    @Override
-    public MetadataManagerFactory initialize(ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
-        if (CUR_VERSION != version) {
-            throw new IOException("Incompatible MsMetadataManagerFactory version " + version
-                    + " found, expected version " + CUR_VERSION);
-        }
-        this.cfg = cfg;
-        try {
-            metastore = MetastoreFactory.createMetaStore(cfg.getMetastoreImplClass());
-            // TODO: need to store metastore class and version in some place.
-            metastore.init(cfg.getConf(), metastore.getVersion());
-        } catch (Exception e) {
-            throw new IOException("Load metastore failed : ", e);
-        }
-
-        try {
-            ownerTable = metastore.createTable(OWNER_TABLE_NAME);
-            if (ownerTable == null) {
-                throw new IOException("create owner table failed");
-            }
-
-            persistTable = metastore.createTable(PERSIST_TABLE_NAME);
-            if (persistTable == null) {
-                throw new IOException("create persistence table failed");
-            }
-
-            subTable = metastore.createScannableTable(SUB_TABLE_NAME);
-            if (subTable == null) {
-                throw new IOException("create subscription table failed");
-            }
-        } catch (MetastoreException me) {
-            throw new IOException("Failed to create tables : ", me);
-        }
-
-        return this;
-    }
-
-    @Override
-    public int getCurrentVersion() {
-        return CUR_VERSION;
-    }
-
-    @Override
-    public void shutdown() {
-        if (metastore == null) {
-            return;
-        }
-
-        if (ownerTable != null) {
-            ownerTable.close();
-            ownerTable = null;
-        }
-
-        if (persistTable != null) {
-            persistTable.close();
-            persistTable = null;
-        }
-
-        if (subTable != null) {
-            subTable.close();
-            subTable = null;
-        }
-
-        metastore.close();
-        metastore = null;
-    }
-
-    @Override
-    public Iterator<ByteString> getTopics() throws IOException {
-        SyncResult<MetastoreCursor> syn = new SyncResult<MetastoreCursor>();
-        persistTable.openCursor(NON_FIELDS, new MetastoreCallback<MetastoreCursor>() {
-            public void complete(int rc, MetastoreCursor cursor, Object ctx) {
-                @SuppressWarnings("unchecked")
-                SyncResult<MetastoreCursor> syn = (SyncResult<MetastoreCursor>) ctx;
-                syn.complete(rc, cursor);
-            }
-        }, syn);
-        try {
-            syn.block();
-        } catch (Exception e) {
-            throw new IOException("Interrupted on getting topics list : ", e);
-        }
-
-        if (syn.getReturnCode() != MSException.Code.OK.getCode()) {
-            throw new IOException("Failed to get topics : ", MSException.create(
-                    MSException.Code.get(syn.getReturnCode()), ""));
-        }
-
-        final MetastoreCursor cursor = syn.getValue();
-        return new Iterator<ByteString>() {
-            Iterator<MetastoreTableItem> itemIter = null;
-
-            @Override
-            public boolean hasNext() {
-                while (null == itemIter || !itemIter.hasNext()) {
-                    if (!cursor.hasMoreEntries()) {
-                        return false;
-                    }
-
-                    try {
-                        itemIter = cursor.readEntries(cfg.getMetastoreMaxEntriesPerScan());
-                    } catch (MSException mse) {
-                        logger.warn("Interrupted when iterating the topics list : ", mse);
-                        return false;
-                    }
-                }
-                return true;
-            }
-
-            @Override
-            public ByteString next() {
-                MetastoreTableItem t = itemIter.next();
-                return ByteString.copyFromUtf8(t.getKey());
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("Doesn't support remove topic from topic iterator.");
-            }
-        };
-    }
-
-    @Override
-    public TopicOwnershipManager newTopicOwnershipManager() {
-        return new MsTopicOwnershipManagerImpl(ownerTable);
-    }
-
-    static class MsTopicOwnershipManagerImpl implements TopicOwnershipManager {
-
-        static final String OWNER_FIELD = "owner";
-
-        final MetastoreTable ownerTable;
-
-        MsTopicOwnershipManagerImpl(MetastoreTable ownerTable) {
-            this.ownerTable = ownerTable;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing
-        }
-
-        @Override
-        public void readOwnerInfo(final ByteString topic, final Callback<Versioned<HubInfo>> callback, Object ctx) {
-            ownerTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
-                @Override
-                public void complete(int rc, Versioned<Value> value, Object ctx) {
-                    if (MSException.Code.NoKey.getCode() == rc) {
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-
-                    if (MSException.Code.OK.getCode() != rc) {
-                        logErrorAndFinishOperation("Could not read ownership for topic " + topic.toStringUtf8(),
-                                callback, ctx, rc);
-                        return;
-                    }
-
-                    HubInfo owner = null;
-                    try {
-                        byte[] data = value.getValue().getField(OWNER_FIELD);
-                        if (data != null) {
-                            owner = HubInfo.parse(new String(data, UTF_8));
-                        }
-                    } catch (HubInfo.InvalidHubInfoException ihie) {
-                        logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8(), ihie);
-                    }
-                    Version version = value.getVersion();
-                    callback.operationFinished(ctx, new Versioned<HubInfo>(owner, version));
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version,
-                final Callback<Version> callback, Object ctx) {
-            Value value = new Value();
-            value.setField(OWNER_FIELD, owner.toString().getBytes(UTF_8));
-
-            ownerTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
-                @Override
-                public void complete(int rc, Version ver, Object ctx) {
-                    if (MSException.Code.OK.getCode() == rc) {
-                        callback.operationFinished(ctx, ver);
-                        return;
-                    } else if (MSException.Code.NoKey.getCode() == rc) {
-                        // no node
-                        callback.operationFailed(
-                                ctx,
-                                PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
-                                        + topic.toStringUtf8()));
-                        return;
-                    } else if (MSException.Code.KeyExists.getCode() == rc) {
-                        // key exists
-                        callback.operationFailed(
-                                ctx,
-                                PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic "
-                                        + topic.toStringUtf8() + " existed."));
-                        return;
-                    } else if (MSException.Code.BadVersion.getCode() == rc) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to update owner info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Failed to update ownership of topic " + topic.toStringUtf8()
-                                + " to " + owner, callback, ctx, rc);
-                        return;
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteOwnerInfo(final ByteString topic, Version version, final Callback<Void> callback,
-                Object ctx) {
-            ownerTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
-                @Override
-                public void complete(int rc, Void value, Object ctx) {
-                    if (MSException.Code.OK.getCode() == rc) {
-                        logger.debug("Successfully deleted owner info for topic {}", topic.toStringUtf8());
-                        callback.operationFinished(ctx, null);
-                        return;
-                    } else if (MSException.Code.NoKey.getCode() == rc) {
-                        // no node
-                        callback.operationFailed(
-                                ctx,
-                                PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic "
-                                        + topic.toStringUtf8()));
-                        return;
-                    } else if (MSException.Code.BadVersion.getCode() == rc) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to delete owner info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Failed to delete owner info for topic " + topic.toStringUtf8(),
-                                callback, ctx, rc);
-                        return;
-                    }
-                }
-            }, ctx);
-        }
-    }
-
-    @Override
-    public TopicPersistenceManager newTopicPersistenceManager() {
-        return new MsTopicPersistenceManagerImpl(persistTable);
-    }
-
-    static class MsTopicPersistenceManagerImpl implements TopicPersistenceManager {
-
-        static final String PERSIST_FIELD = "prst";
-
-        final MetastoreTable persistTable;
-
-        MsTopicPersistenceManagerImpl(MetastoreTable persistTable) {
-            this.persistTable = persistTable;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing
-        }
-
-        @Override
-        public void readTopicPersistenceInfo(final ByteString topic, final Callback<Versioned<LedgerRanges>> callback,
-                Object ctx) {
-            persistTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() {
-                @Override
-                public void complete(int rc, Versioned<Value> value, Object ctx) {
-                    if (MSException.Code.OK.getCode() == rc) {
-                        byte[] data = value.getValue().getField(PERSIST_FIELD);
-                        if (data != null) {
-                            parseAndReturnTopicLedgerRanges(topic, data, value.getVersion(), callback, ctx);
-                        } else { // null data is same as NoKey
-                            callback.operationFinished(ctx, null);
-                        }
-                    } else if (MSException.Code.NoKey.getCode() == rc) {
-                        callback.operationFinished(ctx, null);
-                    } else {
-                        logErrorAndFinishOperation("Could not read ledgers node for topic " + topic.toStringUtf8(),
-                                callback, ctx, rc);
-                    }
-                }
-            }, ctx);
-        }
-
-        /**
-         * Parse ledger ranges data and return it thru callback.
-         *
-         * @param topic
-         *            Topic name
-         * @param data
-         *            Topic Ledger Ranges data
-         * @param version
-         *            Version of the topic ledger ranges data
-         * @param callback
-         *            Callback to return ledger ranges
-         * @param ctx
-         *            Context of the callback
-         */
-        private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, Version version,
-                Callback<Versioned<LedgerRanges>> callback, Object ctx) {
-            try {
-                LedgerRanges.Builder rangesBuilder = LedgerRanges.newBuilder();
-                TextFormat.merge(new String(data, UTF8), rangesBuilder);
-                LedgerRanges lr = rangesBuilder.build();
-                Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(lr, version);
-                callback.operationFinished(ctx, ranges);
-            } catch (ParseException e) {
-                StringBuilder sb = new StringBuilder();
-                sb.append("Ledger ranges for topic ").append(topic.toStringUtf8())
-                        .append(" could not be deserialized.");
-                String msg = sb.toString();
-                logger.error(msg, e);
-                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-            } catch (UnsupportedEncodingException uee) {
-                StringBuilder sb = new StringBuilder();
-                sb.append("Ledger ranges for topic ").append(topic.toStringUtf8()).append(" is not UTF-8 encoded.");
-                String msg = sb.toString();
-                logger.error(msg, uee);
-                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-            }
-        }
-
-        @Override
-        public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version,
-                final Callback<Version> callback, Object ctx) {
-            Value value = new Value();
-            value.setField(PERSIST_FIELD, TextFormat.printToString(ranges).getBytes(UTF_8));
-
-            persistTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() {
-                @Override
-                public void complete(int rc, Version ver, Object ctx) {
-                    if (MSException.Code.OK.getCode() == rc) {
-                        callback.operationFinished(ctx, ver);
-                        return;
-                    } else if (MSException.Code.NoKey.getCode() == rc) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
-                                "No persistence info found for topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (MSException.Code.KeyExists.getCode() == rc) {
-                        // key exists
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS,
-                                "Persistence info of topic " + topic.toStringUtf8() + " existed."));
-                        return;
-                    } else if (MSException.Code.BadVersion.getCode() == rc) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Could not write ledgers node for topic " + topic.toStringUtf8(),
-                                callback, ctx, rc);
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteTopicPersistenceInfo(final ByteString topic, final Version version,
-                final Callback<Void> callback, Object ctx) {
-            persistTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() {
-                @Override
-                public void complete(int rc, Void value, Object ctx) {
-                    if (MSException.Code.OK.getCode() == rc) {
-                        logger.debug("Successfully deleted persistence info for topic {}.", topic.toStringUtf8());
-                        callback.operationFinished(ctx, null);
-                        return;
-                    } else if (MSException.Code.NoKey.getCode() == rc) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
-                                "No persistence info found for topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (MSException.Code.BadVersion.getCode() == rc) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Failed to delete persistence info topic: " + topic.toStringUtf8()
-                                + ", version: " + version, callback, ctx, rc, StatusCode.SERVICE_DOWN);
-                        return;
-                    }
-                }
-            }, ctx);
-        }
-    }
-
-    @Override
-    public SubscriptionDataManager newSubscriptionDataManager() {
-        return new MsSubscriptionDataManagerImpl(cfg, subTable);
-    }
-
-    static class MsSubscriptionDataManagerImpl implements SubscriptionDataManager {
-
-        static final String SUB_STATE_FIELD = "sub_state";
-        static final String SUB_PREFS_FIELD = "sub_preferences";
-
-        static final char TOPIC_SUB_FIRST_SEPARATOR = '\001';
-        static final char TOPIC_SUB_LAST_SEPARATOR = '\002';
-
-        final ServerConfiguration cfg;
-        final MetastoreScannableTable subTable;
-
-        MsSubscriptionDataManagerImpl(ServerConfiguration cfg, MetastoreScannableTable subTable) {
-            this.cfg = cfg;
-            this.subTable = subTable;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing
-        }
-
-        private String getSubscriptionKey(ByteString topic, ByteString subscriberId) {
-            return new StringBuilder(topic.toStringUtf8()).append(TOPIC_SUB_FIRST_SEPARATOR)
-                    .append(subscriberId.toStringUtf8()).toString();
-        }
-
-        private Value subscriptionData2Value(SubscriptionData subData) {
-            Value value = new Value();
-            if (subData.hasState()) {
-                value.setField(SUB_STATE_FIELD, TextFormat.printToString(subData.getState()).getBytes(UTF_8));
-            }
-            if (subData.hasPreferences()) {
-                value.setField(SUB_PREFS_FIELD, TextFormat.printToString(subData.getPreferences()).getBytes(UTF_8));
-            }
-            return value;
-        }
-
-        @Override
-        public void createSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                final SubscriptionData subData, final Callback<Version> callback, Object ctx) {
-            String key = getSubscriptionKey(topic, subscriberId);
-            Value value = subscriptionData2Value(subData);
-
-            subTable.put(key, value, Version.NEW, new MetastoreCallback<Version>() {
-                @Override
-                public void complete(int rc, Version ver, Object ctx) {
-                    if (rc == MSException.Code.OK.getCode()) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully create subscription for topic: " + topic.toStringUtf8()
-                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
-                                    + SubscriptionStateUtils.toString(subData));
-                        }
-                        callback.operationFinished(ctx, ver);
-                    } else if (rc == MSException.Code.KeyExists.getCode()) {
-                        callback.operationFailed(ctx, PubSubException.create(
-                                StatusCode.SUBSCRIPTION_STATE_EXISTS,
-                                "Subscription data for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                        + subscriberId.toStringUtf8() + ") existed."));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Failed to create topic: " + topic.toStringUtf8()
-                                + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
-                                + SubscriptionStateUtils.toString(subData), callback, ctx, rc);
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public boolean isPartialUpdateSupported() {
-            // TODO: Here we assume Metastore support partial update, but this
-            // maybe incorrect.
-            return true;
-        }
-
-        @Override
-        public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                final SubscriptionData subData, final Version version, final Callback<Version> callback,
-                final Object ctx) {
-            updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
-        }
-
-        @Override
-        public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                final SubscriptionData subData, final Version version, final Callback<Version> callback,
-                final Object ctx) {
-            String key = getSubscriptionKey(topic, subscriberId);
-            Value value = subscriptionData2Value(subData);
-
-            subTable.put(key, value, version, new MetastoreCallback<Version>() {
-                @Override
-                public void complete(int rc, Version version, Object ctx) {
-                    if (rc == MSException.Code.OK.getCode()) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully updated subscription data for topic: " + topic.toStringUtf8()
-                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
-                                    + SubscriptionStateUtils.toString(subData) + ", version: " + version);
-                        }
-                        callback.operationFinished(ctx, version);
-                    } else if (rc == MSException.Code.NoKey.getCode()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
-                                "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                        + subscriberId.toStringUtf8() + ")."));
-                        return;
-                    } else if (rc == MSException.Code.BadVersion.getCode()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to update subscription data of topic " + topic.toStringUtf8()
-                                        + " subscriberId " + subscriberId));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation(
-                                "Failed to update subscription data for topic: " + topic.toStringUtf8()
-                                        + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
-                                        + SubscriptionStateUtils.toString(subData) + ", version: " + version, callback,
-                                ctx, rc);
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
-                final Callback<Void> callback, Object ctx) {
-            String key = getSubscriptionKey(topic, subscriberId);
-            subTable.remove(key, version, new MetastoreCallback<Void>() {
-                @Override
-                public void complete(int rc, Void value, Object ctx) {
-                    if (rc == MSException.Code.OK.getCode()) {
-                        logger.debug("Successfully delete subscription for topic: {}, subscriberId: {}.",
-                                topic.toStringUtf8(), subscriberId.toStringUtf8());
-                        callback.operationFinished(ctx, null);
-                        return;
-                    } else if (rc == MSException.Code.BadVersion.getCode()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                "Bad version provided to delete subscriptoin data of topic " + topic.toStringUtf8()
-                                        + " subscriberId " + subscriberId));
-                        return;
-                    } else if (rc == MSException.Code.NoKey.getCode()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
-                                "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                        + subscriberId.toStringUtf8() + ")."));
-                        return;
-                    } else {
-                        logErrorAndFinishOperation("Failed to delete subscription topic: " + topic.toStringUtf8()
-                                + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc,
-                                StatusCode.SERVICE_DOWN);
-                    }
-                }
-            }, ctx);
-        }
-
-        private SubscriptionData value2SubscriptionData(Value value) throws ParseException,
-                UnsupportedEncodingException {
-            SubscriptionData.Builder builder = SubscriptionData.newBuilder();
-
-            byte[] stateData = value.getField(SUB_STATE_FIELD);
-            if (null != stateData) {
-                SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder();
-                TextFormat.merge(new String(stateData, UTF8), stateBuilder);
-                SubscriptionState state = stateBuilder.build();
-                builder.setState(state);
-            }
-
-            byte[] prefsData = value.getField(SUB_PREFS_FIELD);
-            if (null != prefsData) {
-                SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
-                TextFormat.merge(new String(prefsData, UTF8), preferencesBuilder);
-                SubscriptionPreferences preferences = preferencesBuilder.build();
-                builder.setPreferences(preferences);
-            }
-
-            return builder.build();
-        }
-
-        @Override
-        public void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                final Callback<Versioned<SubscriptionData>> callback, Object ctx) {
-            String key = getSubscriptionKey(topic, subscriberId);
-            subTable.get(key, new MetastoreCallback<Versioned<Value>>() {
-                @Override
-                public void complete(int rc, Versioned<Value> value, Object ctx) {
-                    if (rc == MSException.Code.NoKey.getCode()) {
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-
-                    if (rc != MSException.Code.OK.getCode()) {
-                        logErrorAndFinishOperation(
-                                "Could not read subscription data for topic: " + topic.toStringUtf8()
-                                        + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc);
-                        return;
-                    }
-
-                    try {
-                        Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
-                                value2SubscriptionData(value.getValue()), value.getVersion());
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
-                                    + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: "
-                                    + SubscriptionStateUtils.toString(subData.getValue()) + ", version: "
-                                    + subData.getVersion());
-                        }
-                        callback.operationFinished(ctx, subData);
-                    } catch (ParseException e) {
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("Failed to deserialize subscription data for topic:").append(topic.toStringUtf8())
-                                .append(", subscriberId: ").append(subscriberId.toStringUtf8());
-                        String msg = sb.toString();
-                        logger.error(msg, e);
-                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                    } catch (UnsupportedEncodingException uee) {
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
-                                .append(", subscriberId: ").append(subscriberId.toStringUtf8())
-                                .append(" is not UFT-8 encoded");
-                        String msg = sb.toString();
-                        logger.error(msg, uee);
-                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                    }
-                }
-            }, ctx);
-        }
-
-        private String getSubscriptionPrefix(ByteString topic, char sep) {
-            return new StringBuilder(topic.toStringUtf8()).append(sep).toString();
-        }
-
-        private void readSubscriptions(final ByteString topic, final int keyLength, final MetastoreCursor cursor,
-                final Map<ByteString, Versioned<SubscriptionData>> topicSubs,
-                final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
-            if (!cursor.hasMoreEntries()) {
-                callback.operationFinished(ctx, topicSubs);
-                return;
-            }
-            ReadEntriesCallback readCb = new ReadEntriesCallback() {
-                @Override
-                public void complete(int rc, Iterator<MetastoreTableItem> items, Object ctx) {
-                    if (rc != MSException.Code.OK.getCode()) {
-                        logErrorAndFinishOperation("Could not read subscribers for cursor " + cursor,
-                                callback, ctx, rc);
-                        return;
-                    }
-                    while (items.hasNext()) {
-                        MetastoreTableItem item = items.next();
-                        final ByteString subscriberId = ByteString.copyFromUtf8(item.getKey().substring(keyLength));
-                        try {
-                            Versioned<Value> vv = item.getValue();
-                            Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>(
-                                    value2SubscriptionData(vv.getValue()), vv.getVersion());
-                            topicSubs.put(subscriberId, subData);
-                        } catch (ParseException e) {
-                            StringBuilder sb = new StringBuilder();
-                            sb.append("Failed to deserialize subscription data for topic: ")
-                                    .append(topic.toStringUtf8()).append(", subscriberId: ")
-                                    .append(subscriberId.toStringUtf8());
-                            String msg = sb.toString();
-                            logger.error(msg, e);
-                            callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                            return;
-                        } catch (UnsupportedEncodingException e) {
-                            StringBuilder sb = new StringBuilder();
-                            sb.append("Subscription data for topic: ").append(topic.toStringUtf8())
-                                    .append(", subscriberId: ").append(subscriberId.toStringUtf8())
-                                    .append(" is not UTF-8 encoded.");
-                            String msg = sb.toString();
-                            logger.error(msg, e);
-                            callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                            return;
-                        }
-                    }
-                    readSubscriptions(topic, keyLength, cursor, topicSubs, callback, ctx);
-                }
-            };
-            cursor.asyncReadEntries(cfg.getMetastoreMaxEntriesPerScan(), readCb, ctx);
-        }
-
-        @Override
-        public void readSubscriptions(final ByteString topic,
-                final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) {
-            final String firstKey = getSubscriptionPrefix(topic, TOPIC_SUB_FIRST_SEPARATOR);
-            String lastKey = getSubscriptionPrefix(topic, TOPIC_SUB_LAST_SEPARATOR);
-            subTable.openCursor(firstKey, true, lastKey, true, Order.ASC, ALL_FIELDS,
-                    new MetastoreCallback<MetastoreCursor>() {
-                        @Override
-                        public void complete(int rc, MetastoreCursor cursor, Object ctx) {
-                            if (rc != MSException.Code.OK.getCode()) {
-                                logErrorAndFinishOperation(
-                                        "Could not read subscribers for topic " + topic.toStringUtf8(), callback, ctx,
-                                        rc);
-                                return;
-                            }
-
-                            final Map<ByteString, Versioned<SubscriptionData>> topicSubs =
-                                    new ConcurrentHashMap<ByteString, Versioned<SubscriptionData>>();
-                            readSubscriptions(topic, firstKey.length(), cursor, topicSubs, callback, ctx);
-                        }
-                    }, ctx);
-        }
-    }
-
-    /**
-     * callback finish operation with exception specify by code, regardless of
-     * the value of return code rc.
-     */
-    private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc,
-            StatusCode code) {
-        logger.error(msg, MSException.create(MSException.Code.get(rc), ""));
-        callback.operationFailed(ctx, PubSubException.create(code, msg));
-    }
-
-    /**
-     * callback finish operation with corresponding PubSubException converted
-     * from return code rc.
-     */
-    private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc) {
-        StatusCode code;
-
-        if (rc == MSException.Code.NoKey.getCode()) {
-            code = StatusCode.NO_SUCH_TOPIC;
-        } else if (rc == MSException.Code.ServiceDown.getCode()) {
-            code = StatusCode.SERVICE_DOWN;
-        } else {
-            code = StatusCode.UNEXPECTED_CONDITION;
-        }
-
-        logErrorAndFinishOperation(msg, callback, ctx, rc, code);
-    }
-
-    @Override
-    public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
-        try {
-            int maxEntriesPerScan = cfg.getMetastoreMaxEntriesPerScan();
-
-            // clean topic ownership table.
-            logger.info("Cleaning topic ownership table ...");
-            MetastoreUtils.cleanTable(ownerTable, maxEntriesPerScan);
-            logger.info("Cleaned topic ownership table successfully.");
-
-            // clean topic subscription table.
-            logger.info("Cleaning topic subscription table ...");
-            MetastoreUtils.cleanTable(subTable, maxEntriesPerScan);
-            logger.info("Cleaned topic subscription table successfully.");
-
-            // clean topic persistence info table.
-            logger.info("Cleaning topic persistence info table ...");
-            MetastoreUtils.cleanTable(persistTable, maxEntriesPerScan);
-            logger.info("Cleaned topic persistence info table successfully.");
-        } catch (MSException mse) {
-            throw new IOException("Exception when formatting hedwig metastore : ", mse);
-        } catch (InterruptedException ie) {
-            throw new IOException("Interrupted when formatting hedwig metastore : ", ie);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
deleted file mode 100644
index 0bebd45..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage subscription data.
- */
-public interface SubscriptionDataManager extends Closeable {
-
-    /**
-     * Create subscription data.
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriberId
-     *          Subscriber id
-     * @param data 
-     *          Subscription data
-     * @param callback
-     *          Callback when subscription state created. New version would be returned.
-     *          {@link PubSubException.SubscriptionStateExistsException} is returned when subscription state
-     *          existed before.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-                                       Callback<Version> callback, Object ctx);
-
-    /**
-     * Whether the metadata manager supports partial update.
-     *
-     * @return true if the metadata manager supports partial update.
-     *         otherwise, return false.
-     */
-    public boolean isPartialUpdateSupported();
-
-    /**
-     * Update subscription data.
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriberId
-     *          Subscriber id
-     * @param dataToUpdate
-     *          Subscription data to update. So it is a partial data, which contains
-     *          the part of data to update. The implementation should not replace
-     *          existing subscription data with <i>dataToUpdate</i> directly.
-     *          E.g. if there is only state in it, you should update state only.
-     * @param version
-     *          Current version of subscription data.
-     * @param callback
-     *          Callback when subscription state updated. New version would be returned.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
-     *          {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
-     *          is found.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToUpdate, 
-                                       Version version, Callback<Version> callback, Object ctx);
-
-    /**
-     * Replace subscription data.
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriberId
-     *          Subscriber id
-     * @param dataToReplace
-     *          Subscription data to replace.
-     * @param version
-     *          Current version of subscription data.
-     * @param callback
-     *          Callback when subscription state updated. New version would be returned.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
-     *          {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
-     *          is found.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToReplace,
-                                        Version version, Callback<Version> callback, Object ctx);
-
-    /**
-     * Remove subscription data.
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriberId
-     *          Subscriber id
-     * @param version
-     *          Current version of subscription data.
-     * @param callback
-     *          Callback when subscription state deleted
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
-     *          {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
-     *          is found.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version,
-                                       Callback<Void> callback, Object ctx);
-
-    /**
-     * Read subscription data with version.
-     *
-     * @param topic
-     *          Topic Name
-     * @param subscriberId
-     *          Subscriber id
-     * @param callback
-     *          Callback when subscription data read.
-     *          Null is returned when no subscription data is found.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void readSubscriptionData(ByteString topic, ByteString subscriberId,
-                                     Callback<Versioned<SubscriptionData>> callback, Object ctx);
-
-    /**
-     * Read all subscriptions of a topic.
-     *
-     * @param topic
-     *          Topic name
-     * @param callback
-     *          Callback to return subscriptions with version information
-     * @param ctx
-     *          Contxt of the callback
-     */
-    public void readSubscriptions(ByteString topic, Callback<Map<ByteString, Versioned<SubscriptionData>>> cb,
-                                  Object ctx);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
deleted file mode 100644
index f17011c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage topic ownership
- */
-public interface TopicOwnershipManager extends Closeable {
-
-    /**
-     * Read owner information of a topic.
-     *
-     * @param topic
-     *          Topic Name
-     * @param callback
-     *          Callback to return hub info. If there is no owner info, return null;
-     *          If there is data but not valid owner info, return a Versioned object with null hub info;
-     *          If there is valid owner info, return versioned hub info.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void readOwnerInfo(ByteString topic, Callback<Versioned<HubInfo>> callback, Object ctx);
-
-    /**
-     * Write owner info for a specified topic.
-     * A new owner info would be created if there is no one existed before.
-     *
-     * @param topic
-     *          Topic Name
-     * @param owner
-     *          Owner hub info
-     * @param version
-     *          Current version of owner info
-     *          If <code>version</code> is {@link Version.NEW}, create owner info.
-     *          {@link PubSubException.TopicOwnerInfoExistsException} is returned when
-     *          owner info existed before.
-     *          Otherwise, the owner info is updated only when
-     *          provided version equals to its current version.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
-     *          {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info
-     *          found to update.
-     * @param callback
-     *          Callback when owner info updated. New version would be returned if succeed to write.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void writeOwnerInfo(ByteString topic, HubInfo owner, Version version,
-                               Callback<Version> callback, Object ctx);
-
-    /**
-     * Delete owner info for a specified topic.
-     *
-     * @param topic
-     *          Topic Name
-     * @param version
-     *          Current version of owner info
-     *          If <code>version</code> is {@link Version.ANY}, delete owner info no matter its current version.
-     *          Otherwise, the owner info is deleted only when
-     *          provided version equals to its current version.
-     * @param callback
-     *          Callback when owner info deleted.
-     *          {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match.
-     * @param ctx
-     *          Context of the callback.
-     */
-    public void deleteOwnerInfo(ByteString topic, Version version,
-                                Callback<Void> callback, Object ctx);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
deleted file mode 100644
index 69ee709..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.Closeable;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Manage topic persistence metadata.
- */
-public interface TopicPersistenceManager extends Closeable {
-
-    /**
-     * Read persistence info of a specified topic.
-     *
-     * @param topic
-     *          Topic Name
-     * @param callback
-     *          Callback when read persistence info.
-     *          If no persistence info found, return null.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void readTopicPersistenceInfo(ByteString topic,
-                                         Callback<Versioned<LedgerRanges>> callback, Object ctx);
-
-    /**
-     * Update persistence info of a specified topic.
-     *
-     * @param topic
-     *          Topic name
-     * @param ranges
-     *          Persistence info
-     * @param version
-     *          Current version of persistence info.
-     *          If <code>version</code> is {@link Version.NEW}, create persistence info;
-     *          {@link PubSubException.TopicPersistenceInfoExistsException} is returned when
-     *          persistence info existed before.
-     *          Otherwise, the persitence info is updated only when
-     *          provided version equals to its current version.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
-     *          {@link PubSubException.NoTopicPersistenceInfoException} is returned when no
-     *          persistence info found to update.
-     * @param callback
-     *          Callback when persistence info updated. New version would be returned.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
-                                          Callback<Version> callback, Object ctx);
-
-    /**
-     * Delete persistence info of a specified topic.
-     * Currently used in test cases.
-     *
-     * @param topic
-     *          Topic name
-     * @param version
-     *          Current version of persistence info
-     *          If <code>version</code> is {@link Version.ANY}, delete persistence info no matter its current version.
-     *          Otherwise, the persitence info is deleted only when
-     *          provided version equals to its current version.
-     * @param callback
-     *          Callback return whether the deletion succeed.
-     *          {@link PubSubException.NoTopicPersistenceInfoException} is returned when no persistence.
-     *          {@link PubSubException.BadVersionException} is returned when version doesn't match.
-     *          info found to delete.
-     * @param ctx
-     *          Context of the callback
-     */
-    public void deleteTopicPersistenceInfo(ByteString topic, Version version,
-                                           Callback<Void> callback, Object ctx);
-
-}