You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:21 UTC

[11/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
deleted file mode 100644
index 45272e2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import java.io.File;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class HedwigProxy {
-    private static final Logger logger = LoggerFactory.getLogger(HedwigProxy.class);
-
-    HedwigClient client;
-    ServerSocketChannelFactory serverSocketChannelFactory;
-    ChannelGroup allChannels;
-    Map<OperationType, Handler> handlers;
-    ProxyConfiguration cfg;
-    ChannelTracker tracker;
-    ThreadGroup tg;
-
-    public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) {
-        this.cfg = cfg;
-
-        tg = new ThreadGroup("hedwigproxy") {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                exceptionHandler.uncaughtException(t, e);
-            }
-        };
-    }
-
-    public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
-        this(conf, new TerminateJVMExceptionHandler());
-    }
-
-    public void start() throws InterruptedException {
-        final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
-
-        new Thread(tg, new Runnable() {
-            @Override
-            public void run() {
-                client = new HedwigClient(cfg);
-                ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-                serverSocketChannelFactory = new NioServerSocketChannelFactory(
-                        Executors.newCachedThreadPool(tfb.setNameFormat(
-                                "HedwigProxy-NIOBoss-%d").build()),
-                        Executors.newCachedThreadPool(tfb.setNameFormat(
-                                "HedwigProxy-NIOWorker-%d").build()));
-                initializeHandlers();
-                initializeNetty();
-
-                queue.offer(true);
-            }
-        }).start();
-
-        queue.take();
-    }
-
-    // used for testing
-    public ChannelTracker getChannelTracker() {
-        return tracker;
-    }
-
-    protected void initializeHandlers() {
-        handlers = new HashMap<OperationType, Handler>();
-        tracker = new ChannelTracker(client.getSubscriber());
-
-        handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
-        handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
-        handlers.put(OperationType.UNSUBSCRIBE, new ProxyUnsubscribeHandler(client.getSubscriber(), tracker));
-        handlers.put(OperationType.CONSUME, new ProxyConsumeHandler(client.getSubscriber()));
-        handlers.put(OperationType.STOP_DELIVERY, new ProxyStopDeliveryHandler(client.getSubscriber(), tracker));
-        handlers.put(OperationType.START_DELIVERY, new ProxyStartDeliveryHandler(client.getSubscriber(), tracker));
-        handlers.put(OperationType.CLOSESUBSCRIPTION,
-                     new ProxyCloseSubscriptionHandler(client.getSubscriber(), tracker));
-    }
-
-    protected void initializeNetty() {
-        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
-        allChannels = new DefaultChannelGroup("hedwigproxy");
-        ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
-        ChannelDisconnectListener disconnectListener =
-            (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
-        UmbrellaHandler umbrellaHandler =
-            new UmbrellaHandler(allChannels, handlers, disconnectListener, false);
-        PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
-                .getMaximumMessageSize());
-
-        bootstrap.setPipelineFactory(pipeline);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-
-        // Bind and start to accept incoming connections.
-        allChannels.add(bootstrap.bind(new InetSocketAddress(cfg.getProxyPort())));
-        logger.info("Going into receive loop");
-    }
-
-    public void shutdown() {
-        allChannels.close().awaitUninterruptibly();
-        client.close();
-        serverSocketChannelFactory.releaseExternalResources();
-    }
-
-    // the following method only exists for unit-testing purposes, should go
-    // away once we make start delivery totally server-side
-    public Handler getStartDeliveryHandler() {
-        return handlers.get(OperationType.START_DELIVERY);
-    }
-
-    public Handler getStopDeliveryHandler() {
-        return handlers.get(OperationType.STOP_DELIVERY);
-    }
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-
-        logger.info("Attempting to start Hedwig Proxy");
-        ProxyConfiguration conf = new ProxyConfiguration();
-        if (args.length > 0) {
-            String confFile = args[0];
-            try {
-                conf.loadConf(new File(confFile).toURI().toURL());
-            } catch (MalformedURLException e) {
-                String msg = "Could not open configuration file: " + confFile;
-                PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_INVALID_CONF_FILE);
-            } catch (ConfigurationException e) {
-                String msg = "Malformed configuration file: " + confFile;
-                PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_MISCONFIGURED);
-            }
-            logger.info("Using configuration file " + confFile);
-        }
-        try {
-            new HedwigProxy(conf).start();
-        } catch (Throwable t) {
-            PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
deleted file mode 100644
index 13d3993..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyCloseSubscriptionHandler implements Handler {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyCloseSubscriptionHandler.class);
-
-    Subscriber subscriber;
-    ChannelTracker tracker;
-
-    public ProxyCloseSubscriptionHandler(Subscriber subscriber, ChannelTracker tracker) {
-        this.subscriber = subscriber;
-        this.tracker = tracker;
-    }
-
-    @Override
-    public void handleRequest(final PubSubRequest request, final Channel channel) {
-
-        if (!request.hasCloseSubscriptionRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing close subscription request data");
-            return;
-        }
-
-        final ByteString topic = request.getTopic();
-        final ByteString subscriberId = request.getCloseSubscriptionRequest().getSubscriberId();
-
-        subscriber.asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void result) {
-                tracker.aboutToCloseSubscription(topic, subscriberId);         
-                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-            }
-        }, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
deleted file mode 100644
index 051a782..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-
-public class ProxyConfiguration extends ClientConfiguration {
-
-    protected final static String PROXY_PORT = "proxy_port";
-    protected final static String MAX_MESSAGE_SIZE = "max_message_size";
-
-    public int getProxyPort() {
-        return conf.getInt(PROXY_PORT, 9099);
-    }
-
-    @Override
-    public int getMaximumMessageSize() {
-        return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
deleted file mode 100644
index c37ac48..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-public class ProxyConsumeHandler implements Handler {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyConsumeHandler.class);
-
-    Subscriber subscriber;
-
-    public ProxyConsumeHandler(Subscriber subscriber) {
-        this.subscriber = subscriber;
-    }
-
-    @Override
-    public void handleRequest(PubSubRequest request, Channel channel) {
-        if (!request.hasConsumeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing consume request data");
-            return;
-        }
-
-        ConsumeRequest consumeRequest = request.getConsumeRequest();
-        try {
-            subscriber.consume(request.getTopic(), consumeRequest.getSubscriberId(), consumeRequest.getMsgId());
-        } catch (ClientNotSubscribedException e) {
-            // ignore
-            logger.warn("Unexpected consume request", e);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
deleted file mode 100644
index 7ffdb92..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyPublishHander implements Handler {
-    Publisher publisher;
-
-    public ProxyPublishHander(Publisher publisher) {
-        this.publisher = publisher;
-    }
-
-    @Override
-    public void handleRequest(final PubSubRequest request, final Channel channel) {
-        if (!request.hasPublishRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing publish request data");
-            return;
-        }
-
-        final PublishRequest publishRequest = request.getPublishRequest();
-
-        publisher.asyncPublish(request.getTopic(), publishRequest.getMsg(), new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-            }
-        }, null);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
deleted file mode 100644
index d1cbb6b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyStartDeliveryHandler implements Handler {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyStartDeliveryHandler.class);
-
-    Subscriber subscriber;
-    ChannelTracker tracker;
-
-    public ProxyStartDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
-        this.subscriber = subscriber;
-        this.tracker = tracker;
-    }
-
-    @Override
-    public void handleRequest(PubSubRequest request, Channel channel) {
-
-        if (!request.hasStartDeliveryRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing start delivery request data");
-            return;
-        }
-
-        final ByteString topic = request.getTopic();
-        final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
-
-        synchronized (tracker) {
-            // try {
-            // tracker.checkChannelMatches(topic, subscriberId, channel);
-            // } catch (PubSubException e) {
-            // channel.write(PubSubResponseUtils.getResponseForException(e,
-            // request.getTxnId()));
-            // return;
-            // }
-
-            final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
-
-            if (subscribedChannel == null) {
-                channel.write(PubSubResponseUtils.getResponseForException(
-                                  new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
-                                  request.getTxnId()));
-                return;
-            }
-
-            MessageHandler handler = new MessageHandler() {
-                @Override
-                public void deliver(ByteString topic, ByteString subscriberId, Message msg,
-                final Callback<Void> callback, final Object context) {
-
-                    PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
-                                                  ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
-                                              .setTopic(topic).setSubscriberId(subscriberId).build();
-
-                    ChannelFuture future = subscribedChannel.write(response);
-
-                    future.addListener(new ChannelFutureListener() {
-                        @Override
-                        public void operationComplete(ChannelFuture future) throws Exception {
-                            if (!future.isSuccess()) {
-                                // ignoring this failure, because this will
-                                // only happen due to channel disconnect.
-                                // Channel disconnect will in turn stop
-                                // delivery, and stop these errors
-                                return;
-                            }
-
-                            // Tell the hedwig client, that it can send me
-                            // more messages
-                            callback.operationFinished(context, null);
-                        }
-                    });
-                }
-            };
-
-            channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-
-            try {
-                subscriber.startDelivery(topic, subscriberId, handler);
-            } catch (ClientNotSubscribedException e) {
-                // This should not happen, since we already checked the correct
-                // channel and so on
-                logger.error("Unexpected: No subscription when attempting to start delivery", e);
-                throw new RuntimeException(e);
-            } catch (AlreadyStartDeliveryException e) {
-                logger.error("Unexpected: Already start delivery when attempting to start delivery", e);
-                throw new RuntimeException(e);
-            }
-
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
deleted file mode 100644
index f66f9f1..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-public class ProxyStopDeliveryHandler implements Handler {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyStopDeliveryHandler.class);
-
-    Subscriber subscriber;
-    ChannelTracker tracker;
-
-    public ProxyStopDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
-        this.subscriber = subscriber;
-        this.tracker = tracker;
-    }
-
-    @Override
-    public void handleRequest(PubSubRequest request, Channel channel) {
-        if (!request.hasStopDeliveryRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing stop delivery request data");
-            return;
-        }
-
-        final ByteString topic = request.getTopic();
-        final ByteString subscriberId = request.getStopDeliveryRequest().getSubscriberId();
-
-        synchronized (tracker) {
-            try {
-                tracker.checkChannelMatches(topic, subscriberId, channel);
-            } catch (PubSubException e) {
-                // intentionally ignore this error, since stop delivery doesn't
-                // send back a response
-                return;
-            }
-
-            try {
-                subscriber.stopDelivery(topic, subscriberId);
-            } catch (ClientNotSubscribedException e) {
-                // This should not happen, since we already checked the correct
-                // channel and so on
-                logger.warn("Unexpected: No subscription when attempting to stop delivery", e);
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
deleted file mode 100644
index a291dad..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxySubscribeHandler implements Handler, ChannelDisconnectListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxySubscribeHandler.class);
-
-    Subscriber subscriber;
-    ChannelTracker tracker;
-
-    public ProxySubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
-        this.subscriber = subscriber;
-        this.tracker = tracker;
-    }
-
-    @Override
-    public void channelDisconnected(Channel channel) {
-        tracker.channelDisconnected(channel);
-    }
-
-    @Override
-    public void handleRequest(final PubSubRequest request, final Channel channel) {
-        if (!request.hasSubscribeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing subscribe request data");
-            return;
-        }
-
-        SubscribeRequest subRequest = request.getSubscribeRequest();
-        final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(subRequest.getCreateOrAttach()).build();
-
-        subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(),
-                                  opts, new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                try {
-                    tracker.subscribeSucceeded(topicSubscriber, channel);
-                } catch (TopicBusyException e) {
-                    channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId()));
-                    return;
-                }
-                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-            }
-        }, null);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
deleted file mode 100644
index f611905..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyUnsubscribeHandler implements Handler {
-
-    Subscriber subscriber;
-    ChannelTracker tracker;
-
-    public ProxyUnsubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
-        this.subscriber = subscriber;
-        this.tracker = tracker;
-    }
-
-    @Override
-    public void handleRequest(final PubSubRequest request, final Channel channel) {
-        if (!request.hasUnsubscribeRequest()) {
-            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
-                    "Missing unsubscribe request data");
-            return;
-        }
-
-        ByteString topic = request.getTopic();
-        ByteString subscriberId = request.getUnsubscribeRequest().getSubscriberId();
-
-        synchronized (tracker) {
-
-            // Even if unsubscribe fails, the hedwig client closes the channel
-            // on which the subscription is being served. Hence better to tell
-            // the tracker beforehand that this subscription is no longer served
-            tracker.aboutToUnsubscribe(topic, subscriberId);
-
-            subscriber.asyncUnsubscribe(topic, subscriberId, new Callback<Void>() {
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
-                }
-
-                @Override
-                public void operationFinished(Object ctx, Void resultOfOperation) {
-                    channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                }
-            }, null);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
deleted file mode 100644
index 063a99c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-
-/**
- * This is a hub specific implementation of the HedwigClient. All this does
- * though is to override the HedwigSubscriber with the hub specific child class.
- * Creating this class so we can call the protected method in the parent to set
- * the subscriber since we don't want to expose that API to the public.
- */
-public class HedwigHubClient extends HedwigClientImpl {
-
-    // Constructor when we already have a ChannelFactory instantiated.
-    public HedwigHubClient(ClientConfiguration cfg, ClientSocketChannelFactory channelFactory) {
-        super(cfg, channelFactory);
-        // Override the type of HedwigSubscriber with the hub specific one.
-        setSubscriber(new HedwigHubSubscriber(this));
-    }
-
-    // Constructor when we don't have a ChannelFactory. The super constructor
-    // will create one for us.
-    public HedwigHubClient(ClientConfiguration cfg) {
-        super(cfg);
-        // Override the type of HedwigSubscriber with the hub specific one.
-        setSubscriber(new HedwigHubSubscriber(this));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
deleted file mode 100644
index 68d317e..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HedwigHubClientFactory {
-
-    private final ServerConfiguration cfg;
-    private final ClientConfiguration clientConfiguration;
-    private final ClientSocketChannelFactory channelFactory;
-    private static final Logger logger = LoggerFactory.getLogger(HedwigHubClientFactory.class);
-
-    // Constructor that takes in a ServerConfiguration, ClientConfiguration and a ChannelFactory
-    // so we can reuse it for all Clients created here.
-    public HedwigHubClientFactory(ServerConfiguration cfg, ClientConfiguration clientConfiguration,
-                                  ClientSocketChannelFactory channelFactory) {
-        this.cfg = cfg;
-        this.clientConfiguration = clientConfiguration;
-        this.channelFactory = channelFactory;
-    }
-
-    /**
-     * Manufacture a hub client whose default server to connect to is the input
-     * HedwigSocketAddress hub.
-     *
-     * @param hub
-     *            The hub in another region to connect to.
-     */
-    HedwigHubClient create(final HedwigSocketAddress hub) {
-        // Create a hub specific version of the client to use
-        ClientConfiguration hubClientConfiguration = new ClientConfiguration() {
-            @Override
-            protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-                return hub;
-            }
-
-            @Override
-            public boolean isSSLEnabled() {
-                return cfg.isInterRegionSSLEnabled() || clientConfiguration.isSSLEnabled();
-            }
-        };
-        try {
-            hubClientConfiguration.addConf(this.clientConfiguration.getConf());
-        } catch (ConfigurationException e) {
-            String msg = "Configuration exception while loading the client configuration for the region manager.";
-            logger.error(msg);
-            throw new RuntimeException(msg);
-        }
-        return new HedwigHubClient(hubClientConfiguration, channelFactory);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
deleted file mode 100644
index 7055251..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.util.Callback;
-
-/**
- * This is a hub specific child class of the HedwigSubscriber. The main thing is
- * does is wrap the public subscribe/unsubscribe methods by calling the
- * overloaded protected ones passing in a true value for the input boolean
- * parameter isHub. That will just make sure we validate the subscriberId
- * passed, ensuring it is of the right format either for a local or hub
- * subscriber.
- */
-public class HedwigHubSubscriber extends HedwigSubscriber {
-
-    public HedwigHubSubscriber(HedwigClientImpl client) {
-        super(client);
-    }
-
-    @Override
-    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException {
-        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
-        subscribe(topic, subscriberId, options);
-    }
-
-    @Override
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
-                               Object context) {
-        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
-        asyncSubscribe(topic, subscriberId, options, callback, context);
-    }
-
-    @Override
-    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException {
-        subscribe(topic, subscriberId, options, true);
-    }
-
-    @Override
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId,
-                               SubscriptionOptions options, Callback<Void> callback, Object context) {
-        asyncSubscribe(topic, subscriberId, options, callback, context, true);
-    }
-
-    @Override
-    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
-        unsubscribe(topic, subscriberId, true);
-    }
-
-    @Override
-    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
-                                 final Object context) {
-        asyncUnsubscribe(topic, subscriberId, callback, context, true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
deleted file mode 100644
index bae960b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.CountDownLatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.SubscriptionEventListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class RegionManager implements SubscriptionEventListener {
-
-    protected static final Logger LOGGER = LoggerFactory.getLogger(RegionManager.class);
-
-    private final ByteString mySubId;
-    private final PersistenceManager pm;
-    private final ArrayList<HedwigHubClient> clients = new ArrayList<HedwigHubClient>();
-    private final TopicOpQueuer queue;
-    private final String myRegion;
-    // Timer for running a retry thread task to retry remote-subscription in asynchronous mode.
-    private final Timer timer = new Timer(true);
-    private final HashMap<HedwigHubClient, Set<ByteString>> retryMap =
-            new HashMap<HedwigHubClient, Set<ByteString>>();
-    // map used to track whether a topic is remote subscribed or not
-    private final ConcurrentMap<ByteString, Boolean> topicStatuses =
-            new ConcurrentHashMap<ByteString, Boolean>();
-
-    /**
-     * This is the Timer Task for retrying subscribing to remote regions
-     */
-    class RetrySubscribeTask extends TimerTask {
-
-        @Override
-        public void run() {
-            Set<HedwigHubClient> hubClients = new HashSet<HedwigHubClient>();
-            synchronized (retryMap) {
-                hubClients.addAll(retryMap.keySet());
-            }
-            if (hubClients.isEmpty()) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("[" + myRegion + "] There is no hub client needs to retry subscriptions.");
-                }
-                return;
-            }
-            for (HedwigHubClient client : hubClients) {
-                Set<ByteString> topics = null;
-                synchronized (retryMap) {
-                    topics = retryMap.remove(client);
-                }
-                if (null == topics || topics.isEmpty()) {
-                    continue;
-                }
-                final CountDownLatch done = new CountDownLatch(1);
-                Callback<Void> postCb = new Callback<Void>() {
-                    @Override
-                    public void operationFinished(Object ctx,
-                            Void resultOfOperation) {
-                        finish();
-                    }
-                    @Override
-                    public void operationFailed(Object ctx,
-                            PubSubException exception) {
-                        finish();
-                    }
-                    void finish() {
-                        done.countDown();
-                    }
-                };
-                Callback<Void> mcb = CallbackUtils.multiCallback(topics.size(), postCb, null);
-                for (ByteString topic : topics) {
-                    Boolean doRemoteSubscribe = topicStatuses.get(topic);
-                    // topic has been removed, no retry again
-                    if (null == doRemoteSubscribe) {
-                        mcb.operationFinished(null, null);
-                        continue;
-                    }
-                    retrySubscribe(client, topic, mcb);
-                }
-                try {
-                    done.await();
-                } catch (InterruptedException e) {
-                    LOGGER.warn("Exception during retrying remote subscriptions : ", e);
-                }
-            }
-        }
-
-    }
-
-    public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
-                         ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
-        this.pm = pm;
-        mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
-        queue = new TopicOpQueuer(scheduler);
-        for (final String hub : cfg.getRegions()) {
-            clients.add(hubClientFactory.create(new HedwigSocketAddress(hub)));
-        }
-        myRegion = cfg.getMyRegionByteString().toStringUtf8();
-        if (cfg.getRetryRemoteSubscribeThreadRunInterval() > 0) {
-            timer.schedule(new RetrySubscribeTask(), 0, cfg.getRetryRemoteSubscribeThreadRunInterval());
-        }
-    }
-
-    private void putTopicInRetryMap(HedwigHubClient client, ByteString topic) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("[" + myRegion + "] Put topic in retry map : " + topic.toStringUtf8());
-        }
-        synchronized (retryMap) {
-            Set<ByteString> topics = retryMap.get(client);
-            if (null == topics) {
-                topics = new HashSet<ByteString>();
-                retryMap.put(client, topics);
-            }
-            topics.add(topic);
-        }
-    }
-    
-    /**
-     * Do remote subscribe for a specified topic.
-     *
-     * @param client
-     *          Hedwig Hub Client to subscribe remote topic.
-     * @param topic
-     *          Topic to subscribe.
-     * @param synchronous
-     *          Whether to wait for the callback of subscription.
-     * @param mcb
-     *          Callback to trigger after subscription is done.
-     * @param contex
-     *          Callback context
-     */
-    private void doRemoteSubscribe(final HedwigHubClient client, final ByteString topic, final boolean synchronous,
-                                   final Callback<Void> mcb, final Object context) {
-        final HedwigSubscriber sub = client.getSubscriber();
-        try {
-            if (sub.hasSubscription(topic, mySubId)) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("[" + myRegion + "] cross-region subscription for topic "
-                                 + topic.toStringUtf8() + " has existed before.");
-                }
-                mcb.operationFinished(null, null);
-                return;
-            }
-        } catch (PubSubException e) {
-            LOGGER.error("[" + myRegion + "] checking cross-region subscription for topic "
-                         + topic.toStringUtf8() + " failed (this is should not happen): ", e);
-            mcb.operationFailed(context, e);
-            return;
-        }
-        sub.asyncSubscribe(topic, mySubId, CreateOrAttach.CREATE_OR_ATTACH, new Callback<Void>() {
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                if (LOGGER.isDebugEnabled())
-                    LOGGER.debug("[" + myRegion + "] cross-region subscription done for topic " + topic.toStringUtf8());
-                try {
-                    sub.startDelivery(topic, mySubId, new MessageHandler() {
-                        @Override
-                        public void deliver(final ByteString topic, ByteString subscriberId, Message msg,
-                        final Callback<Void> callback, final Object context) {
-                            // When messages are first published
-                            // locally, the PublishHandler sets the
-                            // source region in the Message.
-                            if (msg.hasSrcRegion()) {
-                                Message.newBuilder(msg).setMsgId(
-                                    MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
-                                        RegionSpecificSeqId.newBuilder().setRegion(
-                                            msg.getSrcRegion()).setSeqId(
-                                            msg.getMsgId().getLocalComponent())));
-                            }
-                            pm.persistMessage(new PersistRequest(topic, msg, new Callback<MessageSeqId>() {
-                                @Override
-                                public void operationFinished(Object ctx, MessageSeqId resultOfOperation) {
-                                    if (LOGGER.isDebugEnabled())
-                                        LOGGER.debug("[" + myRegion + "] cross-region recv-fwd succeeded for topic "
-                                                     + topic.toStringUtf8());
-                                    callback.operationFinished(context, null);
-                                }
-
-                                @Override
-                                public void operationFailed(Object ctx, PubSubException exception) {
-                                    if (LOGGER.isDebugEnabled())
-                                        LOGGER.error("[" + myRegion + "] cross-region recv-fwd failed for topic "
-                                                     + topic.toStringUtf8(), exception);
-                                    callback.operationFailed(context, exception);
-                                }
-                            }, null));
-                        }
-                    });
-                    if (LOGGER.isDebugEnabled())
-                        LOGGER.debug("[" + myRegion + "] cross-region start-delivery succeeded for topic "
-                                     + topic.toStringUtf8());
-                    mcb.operationFinished(ctx, null);
-                } catch (PubSubException ex) {
-                    if (LOGGER.isDebugEnabled())
-                        LOGGER.error(
-                                "[" + myRegion + "] cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
-                    mcb.operationFailed(ctx, ex);
-                } catch (AlreadyStartDeliveryException ex) {
-                    LOGGER.error("[" + myRegion + "] cross-region start-delivery failed for topic "
-                               + topic.toStringUtf8(), ex);
-                    mcb.operationFailed(ctx, new PubSubException.UnexpectedConditionException("cross-region start-delivery failed : " + ex.getMessage()));
-                }
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                if (LOGGER.isDebugEnabled())
-                    LOGGER.error("[" + myRegion + "] cross-region subscribe failed for topic " + topic.toStringUtf8(),
-                                 exception);
-                if (!synchronous) {
-                    putTopicInRetryMap(client, topic);
-                }
-                mcb.operationFailed(ctx, exception);
-            }
-        }, null);
-    }
-
-    private void retrySubscribe(final HedwigHubClient client, final ByteString topic, final Callback<Void> cb) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("[" + myRegion + "] Retry remote subscribe topic : " + topic.toStringUtf8());
-        }
-        queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
-            @Override
-            public void run() {
-                Boolean doRemoteSubscribe = topicStatuses.get(topic);
-                // topic has been removed, no retry again
-                if (null == doRemoteSubscribe) {
-                    cb.operationFinished(ctx, null);
-                    return;
-                }
-                doRemoteSubscribe(client, topic, false, cb, ctx);
-            }
-        });
-    }
-
-    @Override
-    public void onFirstLocalSubscribe(final ByteString topic, final boolean synchronous, final Callback<Void> cb) {
-        topicStatuses.put(topic, true);
-        // Whenever we acquire a topic due to a (local) subscribe, subscribe on
-        // it to all the other regions (currently using simple all-to-all
-        // topology).
-        queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
-            @Override
-            public void run() {
-                Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER, 
-                        "[" + myRegion + "] all cross-region subscriptions succeeded", 
-                        "[" + myRegion + "] at least one cross-region subscription failed");
-                final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
-                for (final HedwigHubClient client : clients) {
-                    doRemoteSubscribe(client, topic, synchronous, mcb, ctx);
-                }
-                if (!synchronous)
-                    cb.operationFinished(null, null);
-            }
-        });
-
-    }
-
-    @Override
-    public void onLastLocalUnsubscribe(final ByteString topic) {
-        topicStatuses.remove(topic);
-        // TODO may want to ease up on the eager unsubscribe; this is dropping
-        // cross-region subscriptions ASAP
-        queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, new Callback<Void>() {
-
-            @Override
-            public void operationFinished(Object ctx, Void result) {
-                if (LOGGER.isDebugEnabled())
-                    LOGGER.debug("[" + myRegion + "] cross-region unsubscribes succeeded for topic " + topic.toStringUtf8());
-            }
-
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                if (LOGGER.isDebugEnabled())
-                    LOGGER.error("[" + myRegion + "] cross-region unsubscribes failed for topic " + topic.toStringUtf8(), exception);
-            }
-
-        }, null) {
-            @Override
-            public void run() {
-                Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), cb, ctx);
-                for (final HedwigHubClient client : clients) {
-                    final HedwigSubscriber sub = client.getSubscriber();
-                    try {
-                        if (!sub.hasSubscription(topic, mySubId)) {
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("[" + myRegion + "] cross-region subscription for topic "
-                                             + topic.toStringUtf8() + " has existed before.");
-                            }
-                            mcb.operationFinished(null, null);
-                            continue;
-                        }
-                    } catch (PubSubException e) {
-                        LOGGER.error("[" + myRegion + "] checking cross-region subscription for topic "
-                                     + topic.toStringUtf8() + " failed (this is should not happen): ", e);
-                        mcb.operationFailed(ctx, e);
-                        continue;
-                    }
-                    sub.asyncUnsubscribe(topic, mySubId, mcb, null);
-                }
-            }
-        });
-    }
-
-    // Method to shutdown and stop all of the cross-region Hedwig clients.
-    public void stop() {
-        timer.cancel();
-        for (HedwigHubClient client : clients) {
-            client.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
deleted file mode 100644
index 83d6961..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.ssl;
-
-import java.security.KeyStore;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-
-import org.apache.hedwig.client.ssl.SslContextFactory;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class SslServerContextFactory extends SslContextFactory {
-
-    public SslServerContextFactory(ServerConfiguration cfg) {
-        try {
-            // Load our Java key store.
-            KeyStore ks = KeyStore.getInstance("pkcs12");
-            ks.load(cfg.getCertStream(), cfg.getPassword().toCharArray());
-
-            // Like ssh-agent.
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
-            kmf.init(ks, cfg.getPassword().toCharArray());
-
-            // Create the SSL context.
-            ctx = SSLContext.getInstance("TLS");
-            ctx.init(kmf.getKeyManagers(), getTrustManagers(), null);
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
-    @Override
-    protected boolean isClient() {
-        return false;
-    }
-
-}