You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:19 UTC

[09/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
deleted file mode 100644
index 9a4cb3d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package org.apache.hedwig.server.topics;
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.hedwig.protocol.PubSubProtocol.HubInfoData;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-
-/**
- * Info identifies a hub server.
- */
-public class HubInfo {
-
-    public static class InvalidHubInfoException extends Exception {
-        public InvalidHubInfoException(String msg) {
-            super(msg);
-        }
-
-        public InvalidHubInfoException(String msg, Throwable t) {
-            super(msg, t);
-        }
-    }
-
-    // address identify a hub server
-    final HedwigSocketAddress addr;
-    // its znode czxid
-    final long czxid;
-    // protobuf encoded hub info data to be serialized
-    HubInfoData hubInfoData;
-
-    public HubInfo(HedwigSocketAddress addr, long czxid) {
-        this(addr, czxid, null);
-    }
-
-    protected HubInfo(HedwigSocketAddress addr, long czxid,
-                      HubInfoData data) {
-        this.addr = addr;
-        this.czxid = czxid;
-        this.hubInfoData = data;
-    }
-
-    public HedwigSocketAddress getAddress() {
-        return addr;
-    }
-
-    public long getZxid() {
-        return czxid;
-    }
-
-    private synchronized HubInfoData getHubInfoData() {
-        if (null == hubInfoData) {
-            hubInfoData = HubInfoData.newBuilder().setHostname(addr.toString())
-                                     .setCzxid(czxid).build();
-        }
-        return hubInfoData;
-    }
-
-    @Override
-    public String toString() {
-        return TextFormat.printToString(getHubInfoData());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (null == o) {
-            return false;
-        }
-        if (!(o instanceof HubInfo)) {
-            return false;
-        }
-        HubInfo other = (HubInfo)o;
-        if (null == addr) {
-            if (null == other.addr) {
-                return true;
-            } else {
-                return czxid == other.czxid;
-            }
-        } else {
-            if (addr.equals(other.addr)) {
-                return czxid == other.czxid;
-            } else {
-                return false;
-            }
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return addr.hashCode();
-    }
-
-    /**
-     * Parse hub info from a string.
-     *
-     * @param hubInfoStr
-     *          String representation of hub info
-     * @return hub info
-     * @throws InvalidHubInfoException when <code>hubInfoStr</code> is not a valid
-     *         string representation of hub info.
-     */
-    public static HubInfo parse(String hubInfoStr) throws InvalidHubInfoException {
-        // it is not protobuf encoded hub info, it might be generated by ZkTopicManager
-        if (!hubInfoStr.startsWith("hostname")) {
-            final HedwigSocketAddress owner;
-            try {
-                owner = new HedwigSocketAddress(hubInfoStr);
-            } catch (Exception e) {
-                throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoStr, e);
-            }
-            return new HubInfo(owner, 0L);
-        }
-
-        // it is a protobuf encoded hub info.
-        HubInfoData hubInfoData;
-
-        try {
-            BufferedReader reader = new BufferedReader(
-                new StringReader(hubInfoStr));
-            HubInfoData.Builder dataBuilder = HubInfoData.newBuilder();
-            TextFormat.merge(reader, dataBuilder);
-            hubInfoData = dataBuilder.build();
-        } catch (InvalidProtocolBufferException ipbe) {
-            throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ipbe);
-        } catch (IOException ie) {
-            throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ie);
-        }
-
-        final HedwigSocketAddress owner;
-        try {
-            owner = new HedwigSocketAddress(hubInfoData.getHostname().trim());
-        } catch (Exception e) {
-            throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoData.getHostname(), e);
-        }
-        long ownerZxid = hubInfoData.getCzxid();
-        return new HubInfo(owner, ownerZxid, hubInfoData);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
deleted file mode 100644
index 2f76020..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.topics;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.hedwig.protocol.PubSubProtocol.HubLoadData;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-
-/**
- * This class encapsulates metrics for determining the load on a hub server.
- */
-public class HubLoad implements Comparable<HubLoad> {
-
-    public static final HubLoad MAX_LOAD = new HubLoad(Long.MAX_VALUE);
-    public static final HubLoad MIN_LOAD = new HubLoad(0);
-
-    public static class InvalidHubLoadException extends Exception {
-        private static final long serialVersionUID = 5870487176956413387L;
-
-        public InvalidHubLoadException(String msg) {
-            super(msg);
-        }
-
-        public InvalidHubLoadException(String msg, Throwable t) {
-            super(msg, t);
-        }
-    }
-
-    // how many topics that a hub server serves
-    long numTopics;
-
-    public HubLoad(long num) {
-        this.numTopics = num;
-    }
-
-    public HubLoad(HubLoadData data) {
-        this.numTopics = data.getNumTopics();
-    }
-
-    // TODO: Make this threadsafe (BOOKKEEPER-379)
-    public HubLoad setNumTopics(long numTopics) {
-        this.numTopics = numTopics;
-        return this;
-    }
-
-    public long getNumTopics() {
-        return this.numTopics;
-    }
-
-    public HubLoadData toHubLoadData() {
-        return HubLoadData.newBuilder().setNumTopics(numTopics).build();
-    }
-
-    @Override
-    public String toString() {
-        return TextFormat.printToString(toHubLoadData());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (null == o ||
-            !(o instanceof HubLoad)) {
-            return false;
-        }
-        return 0 == compareTo((HubLoad)o);
-    }
-
-    @Override
-    public int compareTo(HubLoad other) {
-        return numTopics > other.numTopics ?
-               1 : (numTopics < other.numTopics ? -1 : 0);
-    }
-
-    @Override
-    public int hashCode() {
-        return (int)numTopics;
-    }
-
-    /**
-     * Parse hub load from a string.
-     *
-     * @param hubLoadStr
-     *          String representation of hub load
-     * @return hub load
-     * @throws InvalidHubLoadException when <code>hubLoadStr</code> is not a valid
-     *         string representation of hub load.
-     */
-    public static HubLoad parse(String hubLoadStr) throws InvalidHubLoadException {
-        // it is no protobuf encoded hub info, it might be generated by ZkTopicManager
-        if (!hubLoadStr.startsWith("numTopics")) {
-            try {
-                long numTopics = Long.parseLong(hubLoadStr, 10);
-                return new HubLoad(numTopics);
-            } catch (NumberFormatException nfe) {
-                throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, nfe);
-            }
-        }
-        // it it a protobuf encoded hub load data.
-        HubLoadData hubLoadData;
-        try {
-            BufferedReader reader = new BufferedReader(
-                new StringReader(hubLoadStr));
-            HubLoadData.Builder dataBuilder = HubLoadData.newBuilder();
-            TextFormat.merge(reader, dataBuilder);
-            hubLoadData = dataBuilder.build();
-        } catch (InvalidProtocolBufferException ipbe) {
-            throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ipbe);
-        } catch (IOException ie) {
-            throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ie);
-        }
-
-        return new HubLoad(hubLoadData);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
deleted file mode 100644
index 12524c9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.io.IOException;
-
-import org.apache.hedwig.util.Callback;
-
-/**
- * The HubServerManager class manages info about hub servers.
- */
-interface HubServerManager {
-
-    static interface ManagerListener {
-
-        /**
-         * Server manager is suspended if encountering some transient errors.
-         * {@link #onResume()} would be called if those errors could be fixed.
-         * {@link #onShutdown()} would be called if those errors could not be fixed.
-         */
-        public void onSuspend();
-
-        /**
-         * Server manager is resumed after fixing some transient errors.
-         */
-        public void onResume();
-
-        /**
-         * Server manager had to shutdown due to unrecoverable errors.
-         */
-        public void onShutdown();
-    }
-
-    /**
-     * Register a listener to listen events of server manager
-     *
-     * @param listener
-     *          Server Manager Listener
-     */
-    public void registerListener(ManagerListener listener);
-
-    /**
-     * Register itself to the cluster.
-     *
-     * @param selfLoad
-     *          Self load data
-     * @param callback
-     *          Callback when itself registered.
-     * @param ctx
-     *          Callback context.
-     */
-    public void registerSelf(HubLoad selfLoad, Callback<HubInfo> callback, Object ctx);
-
-    /**
-     * Unregister itself from the cluster.
-     */
-    public void unregisterSelf() throws IOException;
-
-    /**
-     * Uploading self server load data.
-     *
-     * It is an asynchrounous call which should not block other operations.
-     * Currently we don't need to care about whether it succeed or not.
-     *
-     * @param selfLoad
-     *          Hub server load data.
-     */
-    public void uploadSelfLoadData(HubLoad selfLoad);
-
-    /**
-     * Check whether a hub server is alive as the id
-     *
-     * @param hub
-     *          Hub id to identify a lifecycle of a hub server
-     * @param callback
-     *          Callback of check result. If the hub server is still
-     *          alive as the provided id <code>hub</code>, return true.
-     *          Otherwise return false.
-     * @param ctx
-     *          Callback context
-     */
-    public void isHubAlive(HubInfo hub, Callback<Boolean> callback, Object ctx);
-
-    /**
-     * Choose a least loaded hub server from available hub servers.
-     *
-     * @param callback
-     *          Callback to return least loaded hub server.
-     * @param ctx
-     *          Callback context.
-     */
-    public void chooseLeastLoadedHub(Callback<HubInfo> callback, Object ctx);
-
-    /**
-     * Try to rebalance the load within the cluster. This function will get
-     * the {@link HubLoad} from all available hubs within the cluster, and then
-     * shed additional load.
-     *
-     * @param tolerancePercentage
-     *          the percentage of load above average that is permissible.
-     * @param maxLoadToShed
-     *          the maximum amount of load to shed per call.
-     * @param callback
-     *          Callback indicating whether we reduced load or not.
-     * @param ctx
-     */
-    public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed,
-                                 Callback<Boolean> callback, Object ctx);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
deleted file mode 100644
index 65cc9c4..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.zookeeper.ZooKeeper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-/**
- * TopicOwnershipManager based topic manager
- */
-public class MMTopicManager extends AbstractTopicManager implements TopicManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(MMTopicManager.class);
-
-    // topic ownership manager
-    private final TopicOwnershipManager mm;
-    // hub server manager
-    private final HubServerManager hubManager;
-
-    private final HubInfo myHubInfo;
-    private final HubLoad myHubLoad;
-
-    // Boolean flag indicating if we should suspend activity. If this is true,
-    // all of the Ops put into the queuer will fail automatically.
-    protected volatile boolean isSuspended = false;
-
-    public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk,
-                          MetadataManagerFactory mmFactory,
-                          ScheduledExecutorService scheduler)
-            throws UnknownHostException, PubSubException {
-        super(cfg, scheduler);
-        // initialize topic ownership manager
-        this.mm = mmFactory.newTopicOwnershipManager();
-        this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
-
-        final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
-            new SynchronousQueue<Either<HubInfo, PubSubException>>();
-
-        myHubLoad = new HubLoad(topics.size());
-        this.hubManager.registerListener(new HubServerManager.ManagerListener() {
-            @Override
-            public void onSuspend() {
-                isSuspended = true;
-            }
-            @Override
-            public void onResume() {
-                isSuspended = false;
-            }
-            @Override
-            public void onShutdown() {
-                // if hub server manager can't work, we had to quit
-                Runtime.getRuntime().exit(1);
-            }
-        });
-        this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
-            @Override
-            public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
-                logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
-                ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                logger.error("Failed to register hub with zookeeper", exception);
-                ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
-            }
-        }, null);
-        Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
-        PubSubException pse = result.right();
-        if (pse != null) {
-            throw pse;
-        }
-        myHubInfo = result.left();
-        logger.info("Start metadata manager based topic manager with hub id : " + myHubInfo);
-    }
-
-    @Override
-    protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
-                                final Callback<HedwigSocketAddress> cb, final Object ctx) {
-        // If operations are suspended due to a ZK client disconnect, just error
-        // out this call and return.
-        if (isSuspended) {
-            cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                                    "MMTopicManager service is temporarily suspended!"));
-            return;
-        }
-
-        TopicStats stats = topics.getIfPresent(topic);
-        if (null != stats) {
-            cb.operationFinished(ctx, addr);
-            return;
-        }
-
-        new MMGetOwnerOp(topic, cb, ctx).read();
-    }
-
-    /**
-     * MetadataManager do topic ledger election using versioned writes.
-     */
-    class MMGetOwnerOp {
-        ByteString topic;
-        Callback<HedwigSocketAddress> cb;
-        Object ctx;
-
-        public MMGetOwnerOp(ByteString topic,
-                            Callback<HedwigSocketAddress> cb, Object ctx) {
-            this.topic = topic;
-            this.cb = cb;
-            this.ctx = ctx;
-        }
-
-        protected void read() {
-            mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
-                @Override
-                public void operationFinished(final Object ctx, final Versioned<HubInfo> owner) {
-                    if (null == owner) {
-                        logger.info("{} : No owner found for topic {}",
-                                    new Object[] { addr, topic.toStringUtf8() });
-                        // no data found
-                        choose(Version.NEW);
-                        return;
-                    }
-                    final Version ownerVersion = owner.getVersion();
-                    if (null == owner.getValue()) {
-                        logger.info("{} : Invalid owner found for topic {}",
-                                    new Object[] { addr, topic.toStringUtf8() });
-                        choose(ownerVersion);
-                        return;
-                    }
-                    final HubInfo hub = owner.getValue();
-                    logger.info("{} : Read owner of topic {} : {}",
-                                new Object[] { addr, topic.toStringUtf8(), hub });
-
-                    logger.info("{}, {}", new Object[] { hub, myHubInfo });
-
-                    if (hub.getAddress().equals(addr)) {
-                        if (myHubInfo.getZxid() == hub.getZxid()) {
-                            claimTopic(ctx);
-                            return;
-                        } else {
-                            choose(ownerVersion);
-                            return;
-                        }
-                    }
-
-                    logger.info("{} : Check whether owner {} for topic {} is still alive.",
-                                new Object[] { addr, hub, topic.toStringUtf8() });
-                    hubManager.isHubAlive(hub, new Callback<Boolean>() {
-                        @Override
-                        public void operationFinished(Object ctx, Boolean isAlive) {
-                            if (isAlive) {
-                                cb.operationFinished(ctx, hub.getAddress());
-                            } else {
-                                choose(ownerVersion);
-                            }
-                        }
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException pse) {
-                            cb.operationFailed(ctx, pse);
-                        }
-                    }, ctx);
-                }
-
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                                       "Could not read ownership for topic " + topic.toStringUtf8() + " : "
-                                       + exception.getMessage()));
-                }
-            }, ctx);
-        }
-
-        public void claim(final Version prevOwnerVersion) {
-            logger.info("{} : claiming topic {} 's owner to be {}",
-                        new Object[] { addr, topic.toStringUtf8(), myHubInfo });
-            mm.writeOwnerInfo(topic, myHubInfo, prevOwnerVersion, new Callback<Version>() {
-                @Override
-                public void operationFinished(Object ctx, Version newVersion) {
-                    claimTopic(ctx);
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
-                        exception instanceof PubSubException.BadVersionException) {
-                        // some one has updated the owner
-                        logger.info("{} : Some one has claimed topic {} 's owner. Try to read the owner again.",
-                                    new Object[] { addr, topic.toStringUtf8() });
-                        read();
-                        return;
-                    }
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                                       "Exception when writing owner info to claim ownership of topic "
-                                       + topic.toStringUtf8() + " : " + exception.getMessage()));
-                }
-            }, ctx);
-        }
-
-        protected void claimTopic(Object ctx) {
-            logger.info("{} : claimed topic {} 's owner to be {}",
-                        new Object[] { addr, topic.toStringUtf8(), myHubInfo });
-            notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
-            hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-        }
-
-        public void choose(final Version prevOwnerVersion) {
-            hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
-                @Override
-                public void operationFinished(Object ctx, HubInfo owner) {
-                    logger.info("{} : Least loaded owner {} is chosen for topic {}",
-                                new Object[] { addr, owner, topic.toStringUtf8() });
-                    if (owner.getAddress().equals(addr)) {
-                        claim(prevOwnerVersion);
-                    } else {
-                        setOwner(owner, prevOwnerVersion);
-                    }
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException pse) {
-                    logger.error("Failed to choose least loaded hub server for topic "
-                               + topic.toStringUtf8() + " : ", pse);
-                    cb.operationFailed(ctx, pse);
-                }
-            }, null);
-        }
-
-        public void setOwner(final HubInfo ownerHubInfo, final Version prevOwnerVersion) {
-            logger.info("{} : setting topic {} 's owner to be {}",
-                        new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
-            mm.writeOwnerInfo(topic, ownerHubInfo, prevOwnerVersion, new Callback<Version>() {
-                @Override
-                public void operationFinished(Object ctx, Version newVersion) {
-                    logger.info("{} : Set topic {} 's owner to be {}",
-                                new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
-                    cb.operationFinished(ctx, ownerHubInfo.getAddress());
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
-                        exception instanceof PubSubException.BadVersionException) {
-                        // some one has updated the owner
-                        logger.info("{} : Some one has set topic {} 's owner. Try to read the owner again.",
-                                    new Object[] { addr, topic.toStringUtf8() });
-                        read();
-                        return;
-                    }
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                                       "Exception when writing owner info to claim ownership of topic "
-                                       + topic.toStringUtf8() + " : " + exception.getMessage()));
-                }
-            }, ctx);
-        }
-    }
-
-    @Override
-    protected void postReleaseCleanup(final ByteString topic,
-                                      final Callback<Void> cb, final Object ctx) {
-
-        // Reduce load. We've removed the topic from our topic set, so do this as well.
-        // When we reclaim the topic, we will increment the load again.
-        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-
-        mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<HubInfo> owner) {
-                if (null == owner) {
-                    // Node has somehow disappeared from under us, live with it
-                    logger.warn("No owner info found when cleaning up topic " + topic.toStringUtf8());
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-                // no valid hub info found, just return
-                if (null == owner.getValue()) {
-                    logger.warn("No valid owner info found when cleaning up topic " + topic.toStringUtf8());
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-                HedwigSocketAddress ownerAddr = owner.getValue().getAddress();
-                if (!ownerAddr.equals(addr)) {
-                    logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
-                                + " but owner " + owner + " found, leaving untouched");
-                    // Not our node, someone else's, leave it alone
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-
-                mm.deleteOwnerInfo(topic, owner.getVersion(), new Callback<Void>() {
-                    @Override
-                    public void operationFinished(Object ctx, Void result) {
-                        cb.operationFinished(ctx, null);
-                    }
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        if (exception instanceof PubSubException.NoTopicOwnerInfoException) {
-                            logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
-                                      + " but it has been removed.");
-                            cb.operationFinished(ctx, null);
-                            return;
-                        }
-                        logger.error("Exception when deleting self-ownership metadata for topic "
-                                     + topic.toStringUtf8() + " : ", exception);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
-                    }
-                }, ctx);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                logger.error("Exception when cleaning up owner info of topic " + topic.toStringUtf8() + " : ", exception);
-                cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
-            }
-        }, ctx);
-    }
-
-    @Override
-    public void stop() {
-        // we just unregister it with zookeeper to make it unavailable from hub servers list
-        try {
-            hubManager.unregisterSelf();
-        } catch (IOException e) {
-            logger.error("Error unregistering hub server " + myHubInfo + " : ", e);
-        }
-        super.stop();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
deleted file mode 100644
index 2a0dcc0..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Shed load by releasing topics.
- */
-public class TopicBasedLoadShedder {
-    private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class);
-    private final double tolerancePercentage;
-    private final long maxLoadToShed;
-    private final TopicManager tm;
-    private final List<ByteString> topicList;
-
-    /**
-     * @param tm The topic manager used to handle load shedding
-     * @param tolerancePercentage The tolerance percentage for shedding load
-     * @param maxLoadToShed The maximum amoung of load to shed in one call.
-     */
-    public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage,
-                                 HubLoad maxLoadToShed) {
-        // Make sure that all functions in this class have a consistent view
-        // of the load. So, we use the same topic list throughout.
-        this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed);
-    }
-
-    /**
-     * This is public because it makes testing easier.
-     * @param tm The topic manager used to handle load shedding
-     * @param topicList The topic list representing topics owned by this hub.
-     * @param tolerancePercentage The tolerance percentage for shedding load
-     * @param maxLoadToShed The maximum amoung of load to shed in one call.
-     */
-    TopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
-                          double tolerancePercentage,
-                          HubLoad maxLoadToShed) {
-        this.tolerancePercentage = tolerancePercentage;
-        this.maxLoadToShed = maxLoadToShed.getNumTopics();
-        this.tm = tm;
-        this.topicList = topicList;
-    }
-
-    /**
-     * Reduce the load on the current hub so that it reaches the target load.
-     * We reduce load by releasing topics using the {@link TopicManager} passed
-     * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)}
-     * to actually release topics.
-     *
-     * @param targetLoad
-     * @param callback
-     *              a Callback<Long> that indicates how many topics we tried to release.
-     * @param ctx
-     */
-    public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
-        int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics();
-        int numTopicsToRelease = topicList.size() - targetTopics;
-
-        // The number of topics we own is less than the target topic size. We don't release
-        // any topics in this case.
-        if (numTopicsToRelease <= 0) {
-            callback.operationFinished(ctx, 0L);
-            return;
-        }
-        // Call releaseTopics() on the topic manager to do this. We let the manager handle the release
-        // policy.
-        tm.releaseTopics(numTopicsToRelease, callback, ctx);
-    }
-
-    /**
-     * Calculate the average number of topics on the currently active hubs and release topics
-     * if required.
-     * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0
-     * We shed a maximum of maxLoadToShed topics
-     * We also hold on to at least one topic.
-     * @param loadMap
-     * @param callback
-     *          A return value of true means we tried to rebalance. False means that there was
-     *          no need to rebalance.
-     * @param ctx
-     */
-    public void shedLoad(final Map<HubInfo, HubLoad> loadMap, final Callback<Boolean> callback,
-                         final Object ctx) {
-
-        long totalTopics = 0L;
-        long myTopics = topicList.size();
-        for (Map.Entry<HubInfo, HubLoad> entry : loadMap.entrySet()) {
-            if (null == entry.getKey() || null == entry.getValue()) {
-                continue;
-            }
-            totalTopics += entry.getValue().toHubLoadData().getNumTopics();
-        }
-
-        double averageTopics = (double)totalTopics/loadMap.size();
-        logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics);
-
-        // Handle the case when averageTopics == 0. We hold on to at least 1 topic.
-        long permissibleTopics =
-            Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0));
-        logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics);
-        if (myTopics <= permissibleTopics) {
-            // My owned topics are less than those permitted by the current tolerance level. No need to release
-            // any topics.
-            callback.operationFinished(ctx, false);
-            return;
-        }
-
-        // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load.
-        // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic.
-        long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed));
-
-        // Reduce the load on the current hub to the target load we calculated above.
-        logger.info("Reducing load on this hub to {} topics.", targetNumTopics);
-        reduceLoadTo(new HubLoad(targetNumTopics), new Callback<Long>() {
-            @Override
-            public void operationFinished(Object ctx, Long numReleased) {
-                logger.info("Released {} topics to shed load.", numReleased);
-                callback.operationFinished(ctx, true);
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException e) {
-                callback.operationFailed(ctx, e);
-            }
-        }, ctx);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
deleted file mode 100644
index 4ed2e59..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import java.util.List;
-
-/**
- * An implementor of this interface is basically responsible for ensuring that
- * there is at most a single host responsible for a given topic at a given time.
- * Also, it is desirable that on a host failure, some other hosts in the cluster
- * claim responsibilities for the topics that were at the failed host. On
- * claiming responsibility for a topic, a host should call its
- * {@link TopicOwnershipChangeListener}.
- *
- */
-
-public interface TopicManager {
-    /**
-     * Get the name of the host responsible for the given topic.
-     *
-     * @param topic
-     *            The topic whose owner to get.
-     * @param cb
-     *            Callback.
-     * @return The name of host responsible for the given topic
-     * @throws ServiceDownException
-     *             If there is an error looking up the information
-     */
-    public void getOwner(ByteString topic, boolean shouldClaim,
-                         Callback<HedwigSocketAddress> cb, Object ctx);
-
-    /**
-     * Increment the number of access times for a given <code>topic</code>.
-     *
-     * @param topic
-     *          Topic Name.
-     */
-    public void incrementTopicAccessTimes(ByteString topic);
-
-    /**
-     * Whenever the topic manager finds out that the set of topics owned by this
-     * node has changed, it can notify a set of
-     * {@link TopicOwnershipChangeListener} objects. Any component of the system
-     * (e.g., the {@link PersistenceManager}) can listen for such changes by
-     * implementing the {@link TopicOwnershipChangeListener} interface and
-     * registering themselves with the {@link TopicManager} using this method.
-     * It is important that the {@link TopicOwnershipChangeListener} reacts
-     * immediately to such notifications, and with no blocking (because multiple
-     * listeners might need to be informed and they are all informed by the same
-     * thread).
-     *
-     * @param listener
-     */
-    public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
-
-    /**
-     * Give up ownership of a topic. If I don't own it, do nothing.
-     *
-     * @throws ServiceDownException
-     *             If there is an error in claiming responsibility for the topic
-     */
-    public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
-
-    /**
-     * Release numTopics topics. If you hold fewer, release all.
-     * @param numTopics
-     *          Number of topics to release.
-     * @param callback
-     *          The callback should be invoked with the number of topics the hub
-     *          released successfully.
-     * @param ctx
-     */
-    public void releaseTopics(int numTopics, Callback<Long> callback, Object ctx);
-
-    /**
-     * Get the list of topics this hub believes it is responsible for.
-     * @return
-     */
-    public List<ByteString> getTopicList();
-
-    /**
-     * Stop topic manager
-     */
-    public void stop();
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
deleted file mode 100644
index b0fe2c9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.util.Callback;
-
-public interface TopicOwnershipChangeListener {
-
-    public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx);
-
-    public void lostTopic(ByteString topic);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
deleted file mode 100644
index 6b3a417..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class TrivialOwnAllTopicManager extends AbstractTopicManager {
-
-    public TrivialOwnAllTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
-            throws UnknownHostException {
-        super(cfg, scheduler);
-    }
-
-    @Override
-    protected void realGetOwner(ByteString topic, boolean shouldClaim,
-                                Callback<HedwigSocketAddress> cb, Object ctx) {
-
-        TopicStats stats = topics.getIfPresent(topic);
-        if (null != stats) {
-            cb.operationFinished(ctx, addr);
-            return;
-        }
-
-        notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
-    }
-
-    @Override
-    protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
-        // No cleanup to do
-        cb.operationFinished(ctx, null);
-    }
-
-    @Override
-    public void stop() {
-        // do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
deleted file mode 100644
index 9651058..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ZooKeeper based hub server manager.
- */
-class ZkHubServerManager implements HubServerManager {
-
-    static Logger logger = LoggerFactory.getLogger(ZkHubServerManager.class);
-
-    final Random rand = new Random();
-
-    private final ServerConfiguration conf;
-    private final ZooKeeper zk;
-    private final HedwigSocketAddress addr;
-    private final TopicManager tm;
-    private final String ephemeralNodePath;
-    private final String hubNodesPath;
-
-    // hub info structure represent itself
-    protected HubInfo myHubInfo;
-    protected volatile boolean isSuspended = false;
-    protected ManagerListener listener = null;
-    protected final ScheduledExecutorService executor;
-
-    // upload hub server load to zookeeper
-    StatCallback loadReportingStatCallback = new StatCallback() {
-        @Override
-        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-            if (rc != KeeperException.Code.OK.intValue()) {
-                logger.warn("Failed to update load information of hub {} in zk", myHubInfo);
-            }
-        }
-    };
-
-    /**
-     * Watcher to monitor available hub server list.
-     */
-    class ZkHubsWatcher implements Watcher {
-        @Override
-        public void process(WatchedEvent event) {
-            if (event.getType().equals(Watcher.Event.EventType.None)) {
-                if (event.getState().equals(
-                        Watcher.Event.KeeperState.Disconnected)) {
-                    logger.warn("ZK client has been disconnected to the ZK server!");
-                    isSuspended = true;
-                    if (null != listener) {
-                        listener.onSuspend();
-                    }
-                } else if (event.getState().equals(
-                        Watcher.Event.KeeperState.SyncConnected)) {
-                    if (isSuspended) {
-                        logger.info("ZK client has been reconnected to the ZK server!");
-                    }
-                    isSuspended = false;
-                    if (null != listener) {
-                        listener.onResume();
-                    }
-                }
-            }
-            if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                logger.error("ZK client connection to the ZK server has expired.!");
-                if (null != listener) {
-                    // Shutdown our executor NOW!
-                    executor.shutdownNow();
-                    listener.onShutdown();
-                }
-            }
-        }
-    }
-
-    class RebalanceRunnable implements Runnable {
-        private final double tolerancePercentage;
-        private final HubLoad maxLoadToShed;
-        private final long delaySeconds;
-
-        public RebalanceRunnable(double tolerancePercentage,
-                                 HubLoad maxLoadToShed,
-                                 long delaySeconds) {
-            this.tolerancePercentage = tolerancePercentage;
-            this.maxLoadToShed = maxLoadToShed;
-            this.delaySeconds = delaySeconds;
-        }
-
-        @Override
-        public void run() {
-            // If we are in suspended state, don't attempt a rebalance.
-            if (isSuspended) {
-                executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
-                return;
-            }
-            // We should attempt a rebalance. We reschedule the job at the tail so that
-            // two rebalances don't happen simultaneously.
-            rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback<Boolean>() {
-                private void reschedule(Runnable task) {
-                    executor.schedule(task, delaySeconds, TimeUnit.SECONDS);
-                }
-
-                @Override
-                public void operationFinished(Object ctx, Boolean didRebalance) {
-                    if (didRebalance == true) {
-                        logger.info("The attempt to rebalance the cluster was successful");
-                    } else {
-                        logger.info("There was no need to rebalance.");
-                    }
-                    // Our original runnable was passed as the context.
-                    reschedule((Runnable)ctx);
-                }
-
-                @Override
-                public void operationFailed(Object ctx, PubSubException e) {
-                    logger.error("The attempt to rebalance the cluster did not succeed.", e);
-                    // Reschedule the job
-                    reschedule((Runnable)ctx);
-                }
-            }, this);
-        }
-
-        public void start() {
-            // Initiate only if delaySeconds > 0
-            if (delaySeconds > 0) {
-                logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}",
-                    new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds });
-                executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
-            }
-        }
-    }
-
-    public ZkHubServerManager(ServerConfiguration conf,
-                              ZooKeeper zk,
-                              HedwigSocketAddress addr,
-                              TopicManager tm) {
-        this.conf = conf;
-        this.zk = zk;
-        this.addr = addr;
-        this.tm = tm;
-        // znode path to store all available hub servers
-        this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
-        // the node's ephemeral node path
-        this.ephemeralNodePath = getHubZkNodePath(addr);
-        this.executor = Executors.newSingleThreadScheduledExecutor();
-        // register available hub servers list watcher
-        zk.register(new ZkHubsWatcher());
-
-        // Start the rebalancer here.
-        new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(),
-                              conf.getRebalanceInterval()).start();
-    }
-
-    @Override
-    public void registerListener(ManagerListener listener) {
-        this.listener = listener;
-    }
-
-    /**
-     * Get the znode path identifying a hub server.
-     *
-     * @param node
-     *          Hub Server Address
-     * @return znode path identifying the hub server.
-     */
-    private String getHubZkNodePath(HedwigSocketAddress node) {
-        String nodePath = this.conf.getZkHostsPrefix(new StringBuilder())
-                          .append("/").append(node).toString();
-        return nodePath;
-    }
-
-    @Override
-    public void registerSelf(final HubLoad selfData, final Callback<HubInfo> callback, Object ctx) {
-        byte[] loadDataBytes = selfData.toString().getBytes(UTF_8);
-        ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, loadDataBytes, Ids.OPEN_ACL_UNSAFE,
-                                         CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                if (rc == Code.OK.intValue()) {
-                    // now we are here
-                    zk.exists(ephemeralNodePath, false, new SafeAsyncZKCallback.StatCallback() {
-                        @Override
-                        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                            if (rc == Code.OK.intValue()) {
-                                myHubInfo = new HubInfo(addr, stat.getCzxid());
-                                callback.operationFinished(ctx, myHubInfo);
-                                return;
-                            } else {
-                                callback.operationFailed(ctx,
-                                        new PubSubException.ServiceDownException(
-                                        "I can't state my hub node after I created it : "
-                                        + ephemeralNodePath));
-                                return;
-                            }
-                        }
-                    }, ctx);
-                    return;
-                }
-                if (rc != Code.NODEEXISTS.intValue()) {
-                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                            "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                    return;
-                }
-
-                logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
-
-                // Node exists, lets try to delete it and retry
-                zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx) {
-                        if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
-                            registerSelf(selfData, callback, ctx);
-                            return;
-                        }
-                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                        return;
-                    }
-                }, ctx);
-            }
-        }, ctx);
-    }
-
-    @Override
-    public void unregisterSelf() throws IOException {
-        try {
-            zk.delete(ephemeralNodePath, -1);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        } catch (KeeperException e) {
-            throw new IOException(e);
-        }
-    }
-
-
-    @Override
-    public void uploadSelfLoadData(HubLoad selfLoad) {
-        logger.debug("Reporting hub load of {} : {}", myHubInfo, selfLoad);
-        byte[] loadDataBytes = selfLoad.toString().getBytes(UTF_8);
-        zk.setData(ephemeralNodePath, loadDataBytes, -1,
-                   loadReportingStatCallback, null);
-    }
-
-    @Override
-    public void isHubAlive(final HubInfo hub, final Callback<Boolean> callback, Object ctx) {
-        zk.exists(getHubZkNodePath(hub.getAddress()), false, new SafeAsyncZKCallback.StatCallback() {
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                if (rc == Code.NONODE.intValue()) {
-                    callback.operationFinished(ctx, false);
-                } else if (rc == Code.OK.intValue()) {
-                    if (hub.getZxid() == stat.getCzxid()) {
-                        callback.operationFinished(ctx, true);
-                    } else {
-                        callback.operationFinished(ctx, false);
-                    }
-                } else {
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(
-                        "Failed to check whether hub server " + hub + " is alive!"));
-                }
-            }
-        }, ctx);
-    }
-
-    @Override
-    public void chooseLeastLoadedHub(final Callback<HubInfo> callback, Object ctx) {
-        // Get the list of existing hosts
-        zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx,
-                                          List<String> children) {
-                if (rc != Code.OK.intValue()) {
-                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                        "Could not get list of available hubs", path, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    return;
-                }
-                chooseLeastLoadedNode(children, callback, ctx);
-            }
-        }, ctx);
-    }
-
-    private void chooseLeastLoadedNode(final List<String> children,
-                                       final Callback<HubInfo> callback, Object ctx) {
-        SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
-            int numResponses = 0;
-            HubLoad minLoad = HubLoad.MAX_LOAD;
-            String leastLoaded = null;
-            long leastLoadedCzxid = 0;
-
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx,
-                                          byte[] data, Stat stat) {
-                synchronized (this) {
-                    if (rc == KeeperException.Code.OK.intValue()) {
-                        try {
-                            HubLoad load = HubLoad.parse(new String(data, UTF_8));
-                            logger.debug("Found server {} with load: {}", ctx, load);
-                            int compareRes = load.compareTo(minLoad);
-                            if (compareRes < 0 || (compareRes == 0 && rand.nextBoolean())) {
-                                minLoad = load;
-                                leastLoaded = (String) ctx;
-                                leastLoadedCzxid = stat.getCzxid();
-                            }
-                        } catch (HubLoad.InvalidHubLoadException e) {
-                            logger.warn("Corrupted load information from hub : " + ctx);
-                            // some corrupted data, we'll just ignore this hub
-                        }
-                    }
-                    numResponses++;
-
-                    if (numResponses == children.size()) {
-                        if (leastLoaded == null) {
-                            callback.operationFailed(ctx,
-                                new PubSubException.ServiceDownException("No hub available"));
-                            return;
-                        }
-                        try {
-                            HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
-                            callback.operationFinished(ctx, new HubInfo(owner, leastLoadedCzxid));
-                        } catch (Throwable t) {
-                            callback.operationFailed(ctx,
-                                new PubSubException.ServiceDownException("Least loaded hub server "
-                                                                       + leastLoaded + " is invalid."));
-                        }
-                    }
-                }
-            }
-        };
-
-        for (String child : children) {
-            zk.getData(conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
-                       dataCallback, child);
-        }
-    }
-
-    /**
-     * Get a map of all currently active hubs with their advertised load.
-     * @param callback
-     * @param originalCtx
-     */
-    private void getActiveHubsInfoWithLoad(final Callback<Map<HubInfo, HubLoad>> callback,
-                                           final Object originalCtx) {
-        // Get the list of children and then for each child, get the data. All asynchronously.
-        zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx, final List<String> children) {
-                if (rc != Code.OK.intValue()) {
-                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                            "Could not get children for given path", path, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    return;
-                }
-
-                // The data callback for every child node
-                SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
-                    Map<HubInfo, HubLoad> loadMap = new HashMap<HubInfo, HubLoad>();
-                    int numResponse = 0;
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object dataCtx,
-                                                  byte[] data, Stat stat) {
-                        synchronized (this) {
-                            if (rc == Code.OK.intValue()) {
-                                // Put this load in the map. dataCtx is actually the child string which is the
-                                // IP:PORT:SSL representation of the hub.
-                                try {
-                                    HubInfo hubInfo =
-                                        new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid());
-                                    HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8));
-                                    this.loadMap.put(hubInfo, hubLoad);
-                                } catch (HubLoad.InvalidHubLoadException e) {
-                                    logger.warn("Corrupt data found for a hub. Ignoring.");
-                                }
-                            }
-                            numResponse++;
-                            if (numResponse == children.size()) {
-                                // We got less number of valid responses than the hubs we saw previously.
-                                // Signal an error.
-                                if (loadMap.size() != numResponse) {
-                                    callback.operationFailed(originalCtx,
-                                        new PubSubException.UnexpectedConditionException(
-                                           "Fewer OK responses than the number of active hubs seen previously."));
-                                    return;
-                                }
-                                // We've seen all responses. All OK.
-                                callback.operationFinished(originalCtx, loadMap);
-                            }
-                        }
-                    }
-                };
-
-                for (String child : children) {
-                    String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString();
-                    zk.getData(znode, false, dataCallback, child);
-                }
-            }
-        }, originalCtx);
-    }
-
-    @Override
-    public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed,
-                                 final Callback<Boolean> callback, final Object ctx) {
-        // Get the load on all active hubs and then shed load if required.
-        getActiveHubsInfoWithLoad(new Callback<Map<HubInfo, HubLoad>>() {
-            @Override
-            public void operationFinished(Object ctx, Map<HubInfo, HubLoad> loadMap) {
-                if (null == tm) {
-                    // No topic manager, so no load to shed.
-                    callback.operationFinished(ctx, false);
-                    return;
-                }
-                TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm,
-                        tolerancePercentage, maxLoadToShed);
-                tbls.shedLoad(loadMap, callback, ctx);
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException e) {
-                // Rebalance failed. Log this and signal failure on the callback.
-                logger.error("Failed to get active hubs. Cannot attempt a rebalance.");
-                callback.operationFailed(ctx, e);
-            }
-        }, ctx);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
deleted file mode 100644
index 2424d27..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
-
-/**
- * Topics are operated on in parallel as they are independent.
- *
- */
-public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ZkTopicManager.class);
-
-    /**
-     * Persistent storage for topic metadata.
-     */
-    private ZooKeeper zk;
-
-    // hub server manager
-    private final HubServerManager hubManager;
-
-    private final HubInfo myHubInfo;
-    private final HubLoad myHubLoad;
-
-    // Boolean flag indicating if we should suspend activity. If this is true,
-    // all of the Ops put into the queuer will fail automatically.
-    protected volatile boolean isSuspended = false;
-
-    /**
-     * Create a new topic manager. Pass in an active ZooKeeper client object.
-     *
-     * @param zk
-     */
-    public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler)
-            throws UnknownHostException, PubSubException {
-
-        super(cfg, scheduler);
-        this.zk = zk;
-        this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
-
-        myHubLoad = new HubLoad(topics.size());
-        this.hubManager.registerListener(new HubServerManager.ManagerListener() {
-            @Override
-            public void onSuspend() {
-                isSuspended = true;
-            }
-            @Override
-            public void onResume() {
-                isSuspended = false;
-            }
-            @Override
-            public void onShutdown() {
-                // if hub server manager can't work, we had to quit
-                Runtime.getRuntime().exit(1);
-            }
-        });
-
-        final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
-            new SynchronousQueue<Either<HubInfo, PubSubException>>();
-        this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
-            @Override
-            public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
-                logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
-                ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                logger.error("Failed to register hub with zookeeper", exception);
-                ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
-            }
-        }, null);
-
-        Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
-        PubSubException pse = result.right();
-        if (pse != null) {
-            throw pse;
-        }
-        myHubInfo = result.left();
-    }
-
-    String hubPath(ByteString topic) {
-        return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
-    }
-
-    @Override
-    protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
-                                final Callback<HedwigSocketAddress> cb, final Object ctx) {
-        // If operations are suspended due to a ZK client disconnect, just error
-        // out this call and return.
-        if (isSuspended) {
-            cb.operationFailed(ctx, new PubSubException.ServiceDownException(
-                                   "ZKTopicManager service is temporarily suspended!"));
-            return;
-        }
-
-        TopicStats stats = topics.getIfPresent(topic);
-        if (null != stats) {
-            cb.operationFinished(ctx, addr);
-            return;
-        }
-
-        new ZkGetOwnerOp(topic, shouldClaim, cb, ctx).read();
-    }
-
-    // Recursively call each other.
-    class ZkGetOwnerOp {
-        ByteString topic;
-        boolean shouldClaim;
-        Callback<HedwigSocketAddress> cb;
-        Object ctx;
-        String hubPath;
-
-        public ZkGetOwnerOp(ByteString topic, boolean shouldClaim, Callback<HedwigSocketAddress> cb, Object ctx) {
-            this.topic = topic;
-            this.shouldClaim = shouldClaim;
-            this.cb = cb;
-            this.ctx = ctx;
-            hubPath = hubPath(topic);
-
-        }
-
-        public void choose() {
-            hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
-                @Override
-                public void operationFinished(Object ctx, HubInfo owner) {
-                    logger.info("{} : Least loaded owner {} is chosen for topic {}",
-                                new Object[] { addr, owner, topic.toStringUtf8() });
-                    if (owner.getAddress().equals(addr)) {
-                        claim();
-                    } else {
-                        cb.operationFinished(ZkGetOwnerOp.this.ctx, owner.getAddress());
-                    }
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException pse) {
-                    logger.error("Failed to choose least loaded hub server for topic "
-                               + topic.toStringUtf8() + " : ", pse);
-                    cb.operationFailed(ctx, pse);
-                }
-            }, null);
-        }
-
-        public void claimOrChoose() {
-            if (shouldClaim)
-                claim();
-            else
-                choose();
-        }
-
-        public void read() {
-            zk.getData(hubPath, false, new SafeAsyncZKCallback.DataCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-
-                    if (rc == Code.NONODE.intValue()) {
-                        claimOrChoose();
-                        return;
-                    }
-
-                    if (rc != Code.OK.intValue()) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
-                                            + topic.toStringUtf8(), path, rc);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-
-                    // successfully did a read
-                    try {
-                        HubInfo ownerHubInfo = HubInfo.parse(new String(data, UTF_8));
-                        HedwigSocketAddress owner = ownerHubInfo.getAddress();
-                        if (!owner.equals(addr)) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner);
-                            }
-                            cb.operationFinished(ctx, owner);
-                            return;
-                        }
-                        logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it");
-                    } catch (HubInfo.InvalidHubInfoException ihie) {
-                        logger.info("Discovered invalid hub info for topic: " + topic.toStringUtf8() + ", will delete it : ", ihie);
-                    }
-
-                    // we must have previously failed and left a
-                    // residual ephemeral node here, so we must
-                    // delete it (clean it up) and then
-                    // re-create/re-acquire the topic.
-                    zk.delete(hubPath, stat.getVersion(), new VoidCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx) {
-                            if (Code.OK.intValue() == rc || Code.NONODE.intValue() == rc) {
-                                claimOrChoose();
-                            } else {
-                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                        "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
-                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                            }
-                        }
-                    }, ctx);
-                }
-            }, ctx);
-        }
-
-        public void claim() {
-            if (logger.isDebugEnabled()) {
-                logger.debug("claiming topic: " + topic.toStringUtf8());
-            }
-
-            ZkUtils.createFullPathOptimistic(zk, hubPath,
-                    myHubInfo.toString().getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
-
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                    if (rc == Code.OK.intValue()) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("claimed topic: " + topic.toStringUtf8());
-                        }
-                        notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
-                        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-                    } else if (rc == Code.NODEEXISTS.intValue()) {
-                        read();
-                    } else {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                "Failed to create ephemeral node to claim ownership of topic: "
-                                                + topic.toStringUtf8(), path, rc);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    }
-                }
-            }, ctx);
-        }
-
-    }
-
-    @Override
-    protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
-
-        // Reduce load. We've removed the topic from our topic set, so do this as well.
-        // When we reclaim the topic, we will increment the load again.
-        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-
-        zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() {
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                if (rc == Code.NONODE.intValue()) {
-                    // Node has somehow disappeared from under us, live with it
-                    // since its a transient node
-                    logger.warn("While deleting self-node for topic: " + topic.toStringUtf8() + ", node not found");
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-
-                if (rc != Code.OK.intValue()) {
-                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                            "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    return;
-                }
-
-                String hubInfoStr = new String(data, UTF_8);
-                try {
-                    HubInfo ownerHubInfo = HubInfo.parse(hubInfoStr);
-                    HedwigSocketAddress owner = ownerHubInfo.getAddress();
-                    if (!owner.equals(addr)) {
-                        logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
-                                    + owner + " found, leaving untouched");
-                        // Not our node, someone else's, leave it alone
-                        cb.operationFinished(ctx, null);
-                        return;
-                    }
-                } catch (HubInfo.InvalidHubInfoException ihie) {
-                    logger.info("Invalid hub info " + hubInfoStr + " found when release topic "
-                              + topic.toStringUtf8() + ". Leaving untouched until next acquire action.");
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-
-                zk.delete(path, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() {
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx) {
-                        if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
-                            KeeperException e = ZkUtils
-                                                .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
-                                                        + topic.toStringUtf8(), path, rc);
-                            cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                            return;
-                        }
-
-                        cb.operationFinished(ctx, null);
-                    }
-                }, ctx);
-            }
-        }, ctx);
-    }
-
-    @Override
-    public void stop() {
-        // we just unregister it with zookeeper to make it unavailable from hub servers list
-        try {
-            hubManager.unregisterSelf();
-        } catch (IOException e) {
-            logger.error("Error unregistering hub server :", e);
-        }
-        super.stop();
-    }
-
-}