You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:21 UTC
[11/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
deleted file mode 100644
index 45272e2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import java.io.File;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class HedwigProxy {
- private static final Logger logger = LoggerFactory.getLogger(HedwigProxy.class);
-
- HedwigClient client;
- ServerSocketChannelFactory serverSocketChannelFactory;
- ChannelGroup allChannels;
- Map<OperationType, Handler> handlers;
- ProxyConfiguration cfg;
- ChannelTracker tracker;
- ThreadGroup tg;
-
- public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) {
- this.cfg = cfg;
-
- tg = new ThreadGroup("hedwigproxy") {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- exceptionHandler.uncaughtException(t, e);
- }
- };
- }
-
- public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
- this(conf, new TerminateJVMExceptionHandler());
- }
-
- public void start() throws InterruptedException {
- final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
-
- new Thread(tg, new Runnable() {
- @Override
- public void run() {
- client = new HedwigClient(cfg);
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
- serverSocketChannelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(tfb.setNameFormat(
- "HedwigProxy-NIOBoss-%d").build()),
- Executors.newCachedThreadPool(tfb.setNameFormat(
- "HedwigProxy-NIOWorker-%d").build()));
- initializeHandlers();
- initializeNetty();
-
- queue.offer(true);
- }
- }).start();
-
- queue.take();
- }
-
- // used for testing
- public ChannelTracker getChannelTracker() {
- return tracker;
- }
-
- protected void initializeHandlers() {
- handlers = new HashMap<OperationType, Handler>();
- tracker = new ChannelTracker(client.getSubscriber());
-
- handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
- handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
- handlers.put(OperationType.UNSUBSCRIBE, new ProxyUnsubscribeHandler(client.getSubscriber(), tracker));
- handlers.put(OperationType.CONSUME, new ProxyConsumeHandler(client.getSubscriber()));
- handlers.put(OperationType.STOP_DELIVERY, new ProxyStopDeliveryHandler(client.getSubscriber(), tracker));
- handlers.put(OperationType.START_DELIVERY, new ProxyStartDeliveryHandler(client.getSubscriber(), tracker));
- handlers.put(OperationType.CLOSESUBSCRIPTION,
- new ProxyCloseSubscriptionHandler(client.getSubscriber(), tracker));
- }
-
- protected void initializeNetty() {
- InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
- allChannels = new DefaultChannelGroup("hedwigproxy");
- ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
- ChannelDisconnectListener disconnectListener =
- (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
- UmbrellaHandler umbrellaHandler =
- new UmbrellaHandler(allChannels, handlers, disconnectListener, false);
- PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
- .getMaximumMessageSize());
-
- bootstrap.setPipelineFactory(pipeline);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
-
- // Bind and start to accept incoming connections.
- allChannels.add(bootstrap.bind(new InetSocketAddress(cfg.getProxyPort())));
- logger.info("Going into receive loop");
- }
-
- public void shutdown() {
- allChannels.close().awaitUninterruptibly();
- client.close();
- serverSocketChannelFactory.releaseExternalResources();
- }
-
- // the following method only exists for unit-testing purposes, should go
- // away once we make start delivery totally server-side
- public Handler getStartDeliveryHandler() {
- return handlers.get(OperationType.START_DELIVERY);
- }
-
- public Handler getStopDeliveryHandler() {
- return handlers.get(OperationType.STOP_DELIVERY);
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) {
-
- logger.info("Attempting to start Hedwig Proxy");
- ProxyConfiguration conf = new ProxyConfiguration();
- if (args.length > 0) {
- String confFile = args[0];
- try {
- conf.loadConf(new File(confFile).toURI().toURL());
- } catch (MalformedURLException e) {
- String msg = "Could not open configuration file: " + confFile;
- PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_INVALID_CONF_FILE);
- } catch (ConfigurationException e) {
- String msg = "Malformed configuration file: " + confFile;
- PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_MISCONFIGURED);
- }
- logger.info("Using configuration file " + confFile);
- }
- try {
- new HedwigProxy(conf).start();
- } catch (Throwable t) {
- PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
deleted file mode 100644
index 13d3993..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyCloseSubscriptionHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyCloseSubscriptionHandler implements Handler {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxyCloseSubscriptionHandler.class);
-
- Subscriber subscriber;
- ChannelTracker tracker;
-
- public ProxyCloseSubscriptionHandler(Subscriber subscriber, ChannelTracker tracker) {
- this.subscriber = subscriber;
- this.tracker = tracker;
- }
-
- @Override
- public void handleRequest(final PubSubRequest request, final Channel channel) {
-
- if (!request.hasCloseSubscriptionRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing close subscription request data");
- return;
- }
-
- final ByteString topic = request.getTopic();
- final ByteString subscriberId = request.getCloseSubscriptionRequest().getSubscriberId();
-
- subscriber.asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- }
-
- @Override
- public void operationFinished(Object ctx, Void result) {
- tracker.aboutToCloseSubscription(topic, subscriberId);
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- }
- }, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
deleted file mode 100644
index 051a782..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-
-public class ProxyConfiguration extends ClientConfiguration {
-
- protected final static String PROXY_PORT = "proxy_port";
- protected final static String MAX_MESSAGE_SIZE = "max_message_size";
-
- public int getProxyPort() {
- return conf.getInt(PROXY_PORT, 9099);
- }
-
- @Override
- public int getMaximumMessageSize() {
- return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
deleted file mode 100644
index c37ac48..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-public class ProxyConsumeHandler implements Handler {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxyConsumeHandler.class);
-
- Subscriber subscriber;
-
- public ProxyConsumeHandler(Subscriber subscriber) {
- this.subscriber = subscriber;
- }
-
- @Override
- public void handleRequest(PubSubRequest request, Channel channel) {
- if (!request.hasConsumeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing consume request data");
- return;
- }
-
- ConsumeRequest consumeRequest = request.getConsumeRequest();
- try {
- subscriber.consume(request.getTopic(), consumeRequest.getSubscriberId(), consumeRequest.getMsgId());
- } catch (ClientNotSubscribedException e) {
- // ignore
- logger.warn("Unexpected consume request", e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
deleted file mode 100644
index 7ffdb92..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyPublishHander implements Handler {
- Publisher publisher;
-
- public ProxyPublishHander(Publisher publisher) {
- this.publisher = publisher;
- }
-
- @Override
- public void handleRequest(final PubSubRequest request, final Channel channel) {
- if (!request.hasPublishRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing publish request data");
- return;
- }
-
- final PublishRequest publishRequest = request.getPublishRequest();
-
- publisher.asyncPublish(request.getTopic(), publishRequest.getMsg(), new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- }
- }, null);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
deleted file mode 100644
index d1cbb6b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyStartDeliveryHandler implements Handler {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxyStartDeliveryHandler.class);
-
- Subscriber subscriber;
- ChannelTracker tracker;
-
- public ProxyStartDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
- this.subscriber = subscriber;
- this.tracker = tracker;
- }
-
- @Override
- public void handleRequest(PubSubRequest request, Channel channel) {
-
- if (!request.hasStartDeliveryRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing start delivery request data");
- return;
- }
-
- final ByteString topic = request.getTopic();
- final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
-
- synchronized (tracker) {
- // try {
- // tracker.checkChannelMatches(topic, subscriberId, channel);
- // } catch (PubSubException e) {
- // channel.write(PubSubResponseUtils.getResponseForException(e,
- // request.getTxnId()));
- // return;
- // }
-
- final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
-
- if (subscribedChannel == null) {
- channel.write(PubSubResponseUtils.getResponseForException(
- new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
- request.getTxnId()));
- return;
- }
-
- MessageHandler handler = new MessageHandler() {
- @Override
- public void deliver(ByteString topic, ByteString subscriberId, Message msg,
- final Callback<Void> callback, final Object context) {
-
- PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
- ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
- .setTopic(topic).setSubscriberId(subscriberId).build();
-
- ChannelFuture future = subscribedChannel.write(response);
-
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- // ignoring this failure, because this will
- // only happen due to channel disconnect.
- // Channel disconnect will in turn stop
- // delivery, and stop these errors
- return;
- }
-
- // Tell the hedwig client, that it can send me
- // more messages
- callback.operationFinished(context, null);
- }
- });
- }
- };
-
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-
- try {
- subscriber.startDelivery(topic, subscriberId, handler);
- } catch (ClientNotSubscribedException e) {
- // This should not happen, since we already checked the correct
- // channel and so on
- logger.error("Unexpected: No subscription when attempting to start delivery", e);
- throw new RuntimeException(e);
- } catch (AlreadyStartDeliveryException e) {
- logger.error("Unexpected: Already start delivery when attempting to start delivery", e);
- throw new RuntimeException(e);
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
deleted file mode 100644
index f66f9f1..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-
-public class ProxyStopDeliveryHandler implements Handler {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxyStopDeliveryHandler.class);
-
- Subscriber subscriber;
- ChannelTracker tracker;
-
- public ProxyStopDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
- this.subscriber = subscriber;
- this.tracker = tracker;
- }
-
- @Override
- public void handleRequest(PubSubRequest request, Channel channel) {
- if (!request.hasStopDeliveryRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing stop delivery request data");
- return;
- }
-
- final ByteString topic = request.getTopic();
- final ByteString subscriberId = request.getStopDeliveryRequest().getSubscriberId();
-
- synchronized (tracker) {
- try {
- tracker.checkChannelMatches(topic, subscriberId, channel);
- } catch (PubSubException e) {
- // intentionally ignore this error, since stop delivery doesn't
- // send back a response
- return;
- }
-
- try {
- subscriber.stopDelivery(topic, subscriberId);
- } catch (ClientNotSubscribedException e) {
- // This should not happen, since we already checked the correct
- // channel and so on
- logger.warn("Unexpected: No subscription when attempting to stop delivery", e);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
deleted file mode 100644
index a291dad..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxySubscribeHandler implements Handler, ChannelDisconnectListener {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxySubscribeHandler.class);
-
- Subscriber subscriber;
- ChannelTracker tracker;
-
- public ProxySubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
- this.subscriber = subscriber;
- this.tracker = tracker;
- }
-
- @Override
- public void channelDisconnected(Channel channel) {
- tracker.channelDisconnected(channel);
- }
-
- @Override
- public void handleRequest(final PubSubRequest request, final Channel channel) {
- if (!request.hasSubscribeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing subscribe request data");
- return;
- }
-
- SubscribeRequest subRequest = request.getSubscribeRequest();
- final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(subRequest.getCreateOrAttach()).build();
-
- subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(),
- opts, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- try {
- tracker.subscribeSucceeded(topicSubscriber, channel);
- } catch (TopicBusyException e) {
- channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId()));
- return;
- }
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- }
- }, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
deleted file mode 100644
index f611905..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.proxy;
-
-import org.jboss.netty.channel.Channel;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.util.Callback;
-
-public class ProxyUnsubscribeHandler implements Handler {
-
- Subscriber subscriber;
- ChannelTracker tracker;
-
- public ProxyUnsubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
- this.subscriber = subscriber;
- this.tracker = tracker;
- }
-
- @Override
- public void handleRequest(final PubSubRequest request, final Channel channel) {
- if (!request.hasUnsubscribeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing unsubscribe request data");
- return;
- }
-
- ByteString topic = request.getTopic();
- ByteString subscriberId = request.getUnsubscribeRequest().getSubscriberId();
-
- synchronized (tracker) {
-
- // Even if unsubscribe fails, the hedwig client closes the channel
- // on which the subscription is being served. Hence better to tell
- // the tracker beforehand that this subscription is no longer served
- tracker.aboutToUnsubscribe(topic, subscriberId);
-
- subscriber.asyncUnsubscribe(topic, subscriberId, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- }
- }, null);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
deleted file mode 100644
index 063a99c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-
-/**
- * This is a hub specific implementation of the HedwigClient. All this does
- * though is to override the HedwigSubscriber with the hub specific child class.
- * Creating this class so we can call the protected method in the parent to set
- * the subscriber since we don't want to expose that API to the public.
- */
-public class HedwigHubClient extends HedwigClientImpl {
-
- // Constructor when we already have a ChannelFactory instantiated.
- public HedwigHubClient(ClientConfiguration cfg, ClientSocketChannelFactory channelFactory) {
- super(cfg, channelFactory);
- // Override the type of HedwigSubscriber with the hub specific one.
- setSubscriber(new HedwigHubSubscriber(this));
- }
-
- // Constructor when we don't have a ChannelFactory. The super constructor
- // will create one for us.
- public HedwigHubClient(ClientConfiguration cfg) {
- super(cfg);
- // Override the type of HedwigSubscriber with the hub specific one.
- setSubscriber(new HedwigHubSubscriber(this));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
deleted file mode 100644
index 68d317e..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HedwigHubClientFactory {
-
- private final ServerConfiguration cfg;
- private final ClientConfiguration clientConfiguration;
- private final ClientSocketChannelFactory channelFactory;
- private static final Logger logger = LoggerFactory.getLogger(HedwigHubClientFactory.class);
-
- // Constructor that takes in a ServerConfiguration, ClientConfiguration and a ChannelFactory
- // so we can reuse it for all Clients created here.
- public HedwigHubClientFactory(ServerConfiguration cfg, ClientConfiguration clientConfiguration,
- ClientSocketChannelFactory channelFactory) {
- this.cfg = cfg;
- this.clientConfiguration = clientConfiguration;
- this.channelFactory = channelFactory;
- }
-
- /**
- * Manufacture a hub client whose default server to connect to is the input
- * HedwigSocketAddress hub.
- *
- * @param hub
- * The hub in another region to connect to.
- */
- HedwigHubClient create(final HedwigSocketAddress hub) {
- // Create a hub specific version of the client to use
- ClientConfiguration hubClientConfiguration = new ClientConfiguration() {
- @Override
- protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
- return hub;
- }
-
- @Override
- public boolean isSSLEnabled() {
- return cfg.isInterRegionSSLEnabled() || clientConfiguration.isSSLEnabled();
- }
- };
- try {
- hubClientConfiguration.addConf(this.clientConfiguration.getConf());
- } catch (ConfigurationException e) {
- String msg = "Configuration exception while loading the client configuration for the region manager.";
- logger.error(msg);
- throw new RuntimeException(msg);
- }
- return new HedwigHubClient(hubClientConfiguration, channelFactory);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
deleted file mode 100644
index 7055251..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.util.Callback;
-
-/**
- * This is a hub specific child class of the HedwigSubscriber. The main thing is
- * does is wrap the public subscribe/unsubscribe methods by calling the
- * overloaded protected ones passing in a true value for the input boolean
- * parameter isHub. That will just make sure we validate the subscriberId
- * passed, ensuring it is of the right format either for a local or hub
- * subscriber.
- */
-public class HedwigHubSubscriber extends HedwigSubscriber {
-
- public HedwigHubSubscriber(HedwigClientImpl client) {
- super(client);
- }
-
- @Override
- public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
- throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
- subscribe(topic, subscriberId, options);
- }
-
- @Override
- public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
- Object context) {
- SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
- asyncSubscribe(topic, subscriberId, options, callback, context);
- }
-
- @Override
- public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
- throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
- subscribe(topic, subscriberId, options, true);
- }
-
- @Override
- public void asyncSubscribe(ByteString topic, ByteString subscriberId,
- SubscriptionOptions options, Callback<Void> callback, Object context) {
- asyncSubscribe(topic, subscriberId, options, callback, context, true);
- }
-
- @Override
- public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
- unsubscribe(topic, subscriberId, true);
- }
-
- @Override
- public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
- final Object context) {
- asyncUnsubscribe(topic, subscriberId, callback, context, true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
deleted file mode 100644
index bae960b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.regions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.CountDownLatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.SubscriptionEventListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class RegionManager implements SubscriptionEventListener {
-
- protected static final Logger LOGGER = LoggerFactory.getLogger(RegionManager.class);
-
- private final ByteString mySubId;
- private final PersistenceManager pm;
- private final ArrayList<HedwigHubClient> clients = new ArrayList<HedwigHubClient>();
- private final TopicOpQueuer queue;
- private final String myRegion;
- // Timer for running a retry thread task to retry remote-subscription in asynchronous mode.
- private final Timer timer = new Timer(true);
- private final HashMap<HedwigHubClient, Set<ByteString>> retryMap =
- new HashMap<HedwigHubClient, Set<ByteString>>();
- // map used to track whether a topic is remote subscribed or not
- private final ConcurrentMap<ByteString, Boolean> topicStatuses =
- new ConcurrentHashMap<ByteString, Boolean>();
-
- /**
- * This is the Timer Task for retrying subscribing to remote regions
- */
- class RetrySubscribeTask extends TimerTask {
-
- @Override
- public void run() {
- Set<HedwigHubClient> hubClients = new HashSet<HedwigHubClient>();
- synchronized (retryMap) {
- hubClients.addAll(retryMap.keySet());
- }
- if (hubClients.isEmpty()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[" + myRegion + "] There is no hub client needs to retry subscriptions.");
- }
- return;
- }
- for (HedwigHubClient client : hubClients) {
- Set<ByteString> topics = null;
- synchronized (retryMap) {
- topics = retryMap.remove(client);
- }
- if (null == topics || topics.isEmpty()) {
- continue;
- }
- final CountDownLatch done = new CountDownLatch(1);
- Callback<Void> postCb = new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx,
- Void resultOfOperation) {
- finish();
- }
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- finish();
- }
- void finish() {
- done.countDown();
- }
- };
- Callback<Void> mcb = CallbackUtils.multiCallback(topics.size(), postCb, null);
- for (ByteString topic : topics) {
- Boolean doRemoteSubscribe = topicStatuses.get(topic);
- // topic has been removed, no retry again
- if (null == doRemoteSubscribe) {
- mcb.operationFinished(null, null);
- continue;
- }
- retrySubscribe(client, topic, mcb);
- }
- try {
- done.await();
- } catch (InterruptedException e) {
- LOGGER.warn("Exception during retrying remote subscriptions : ", e);
- }
- }
- }
-
- }
-
- public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
- ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
- this.pm = pm;
- mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
- queue = new TopicOpQueuer(scheduler);
- for (final String hub : cfg.getRegions()) {
- clients.add(hubClientFactory.create(new HedwigSocketAddress(hub)));
- }
- myRegion = cfg.getMyRegionByteString().toStringUtf8();
- if (cfg.getRetryRemoteSubscribeThreadRunInterval() > 0) {
- timer.schedule(new RetrySubscribeTask(), 0, cfg.getRetryRemoteSubscribeThreadRunInterval());
- }
- }
-
- private void putTopicInRetryMap(HedwigHubClient client, ByteString topic) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[" + myRegion + "] Put topic in retry map : " + topic.toStringUtf8());
- }
- synchronized (retryMap) {
- Set<ByteString> topics = retryMap.get(client);
- if (null == topics) {
- topics = new HashSet<ByteString>();
- retryMap.put(client, topics);
- }
- topics.add(topic);
- }
- }
-
- /**
- * Do remote subscribe for a specified topic.
- *
- * @param client
- * Hedwig Hub Client to subscribe remote topic.
- * @param topic
- * Topic to subscribe.
- * @param synchronous
- * Whether to wait for the callback of subscription.
- * @param mcb
- * Callback to trigger after subscription is done.
- * @param contex
- * Callback context
- */
- private void doRemoteSubscribe(final HedwigHubClient client, final ByteString topic, final boolean synchronous,
- final Callback<Void> mcb, final Object context) {
- final HedwigSubscriber sub = client.getSubscriber();
- try {
- if (sub.hasSubscription(topic, mySubId)) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[" + myRegion + "] cross-region subscription for topic "
- + topic.toStringUtf8() + " has existed before.");
- }
- mcb.operationFinished(null, null);
- return;
- }
- } catch (PubSubException e) {
- LOGGER.error("[" + myRegion + "] checking cross-region subscription for topic "
- + topic.toStringUtf8() + " failed (this is should not happen): ", e);
- mcb.operationFailed(context, e);
- return;
- }
- sub.asyncSubscribe(topic, mySubId, CreateOrAttach.CREATE_OR_ATTACH, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- if (LOGGER.isDebugEnabled())
- LOGGER.debug("[" + myRegion + "] cross-region subscription done for topic " + topic.toStringUtf8());
- try {
- sub.startDelivery(topic, mySubId, new MessageHandler() {
- @Override
- public void deliver(final ByteString topic, ByteString subscriberId, Message msg,
- final Callback<Void> callback, final Object context) {
- // When messages are first published
- // locally, the PublishHandler sets the
- // source region in the Message.
- if (msg.hasSrcRegion()) {
- Message.newBuilder(msg).setMsgId(
- MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
- RegionSpecificSeqId.newBuilder().setRegion(
- msg.getSrcRegion()).setSeqId(
- msg.getMsgId().getLocalComponent())));
- }
- pm.persistMessage(new PersistRequest(topic, msg, new Callback<MessageSeqId>() {
- @Override
- public void operationFinished(Object ctx, MessageSeqId resultOfOperation) {
- if (LOGGER.isDebugEnabled())
- LOGGER.debug("[" + myRegion + "] cross-region recv-fwd succeeded for topic "
- + topic.toStringUtf8());
- callback.operationFinished(context, null);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (LOGGER.isDebugEnabled())
- LOGGER.error("[" + myRegion + "] cross-region recv-fwd failed for topic "
- + topic.toStringUtf8(), exception);
- callback.operationFailed(context, exception);
- }
- }, null));
- }
- });
- if (LOGGER.isDebugEnabled())
- LOGGER.debug("[" + myRegion + "] cross-region start-delivery succeeded for topic "
- + topic.toStringUtf8());
- mcb.operationFinished(ctx, null);
- } catch (PubSubException ex) {
- if (LOGGER.isDebugEnabled())
- LOGGER.error(
- "[" + myRegion + "] cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
- mcb.operationFailed(ctx, ex);
- } catch (AlreadyStartDeliveryException ex) {
- LOGGER.error("[" + myRegion + "] cross-region start-delivery failed for topic "
- + topic.toStringUtf8(), ex);
- mcb.operationFailed(ctx, new PubSubException.UnexpectedConditionException("cross-region start-delivery failed : " + ex.getMessage()));
- }
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (LOGGER.isDebugEnabled())
- LOGGER.error("[" + myRegion + "] cross-region subscribe failed for topic " + topic.toStringUtf8(),
- exception);
- if (!synchronous) {
- putTopicInRetryMap(client, topic);
- }
- mcb.operationFailed(ctx, exception);
- }
- }, null);
- }
-
- private void retrySubscribe(final HedwigHubClient client, final ByteString topic, final Callback<Void> cb) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[" + myRegion + "] Retry remote subscribe topic : " + topic.toStringUtf8());
- }
- queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
- @Override
- public void run() {
- Boolean doRemoteSubscribe = topicStatuses.get(topic);
- // topic has been removed, no retry again
- if (null == doRemoteSubscribe) {
- cb.operationFinished(ctx, null);
- return;
- }
- doRemoteSubscribe(client, topic, false, cb, ctx);
- }
- });
- }
-
- @Override
- public void onFirstLocalSubscribe(final ByteString topic, final boolean synchronous, final Callback<Void> cb) {
- topicStatuses.put(topic, true);
- // Whenever we acquire a topic due to a (local) subscribe, subscribe on
- // it to all the other regions (currently using simple all-to-all
- // topology).
- queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
- @Override
- public void run() {
- Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER,
- "[" + myRegion + "] all cross-region subscriptions succeeded",
- "[" + myRegion + "] at least one cross-region subscription failed");
- final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
- for (final HedwigHubClient client : clients) {
- doRemoteSubscribe(client, topic, synchronous, mcb, ctx);
- }
- if (!synchronous)
- cb.operationFinished(null, null);
- }
- });
-
- }
-
- @Override
- public void onLastLocalUnsubscribe(final ByteString topic) {
- topicStatuses.remove(topic);
- // TODO may want to ease up on the eager unsubscribe; this is dropping
- // cross-region subscriptions ASAP
- queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, new Callback<Void>() {
-
- @Override
- public void operationFinished(Object ctx, Void result) {
- if (LOGGER.isDebugEnabled())
- LOGGER.debug("[" + myRegion + "] cross-region unsubscribes succeeded for topic " + topic.toStringUtf8());
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (LOGGER.isDebugEnabled())
- LOGGER.error("[" + myRegion + "] cross-region unsubscribes failed for topic " + topic.toStringUtf8(), exception);
- }
-
- }, null) {
- @Override
- public void run() {
- Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), cb, ctx);
- for (final HedwigHubClient client : clients) {
- final HedwigSubscriber sub = client.getSubscriber();
- try {
- if (!sub.hasSubscription(topic, mySubId)) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[" + myRegion + "] cross-region subscription for topic "
- + topic.toStringUtf8() + " has existed before.");
- }
- mcb.operationFinished(null, null);
- continue;
- }
- } catch (PubSubException e) {
- LOGGER.error("[" + myRegion + "] checking cross-region subscription for topic "
- + topic.toStringUtf8() + " failed (this is should not happen): ", e);
- mcb.operationFailed(ctx, e);
- continue;
- }
- sub.asyncUnsubscribe(topic, mySubId, mcb, null);
- }
- }
- });
- }
-
- // Method to shutdown and stop all of the cross-region Hedwig clients.
- public void stop() {
- timer.cancel();
- for (HedwigHubClient client : clients) {
- client.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
deleted file mode 100644
index 83d6961..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.ssl;
-
-import java.security.KeyStore;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-
-import org.apache.hedwig.client.ssl.SslContextFactory;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class SslServerContextFactory extends SslContextFactory {
-
- public SslServerContextFactory(ServerConfiguration cfg) {
- try {
- // Load our Java key store.
- KeyStore ks = KeyStore.getInstance("pkcs12");
- ks.load(cfg.getCertStream(), cfg.getPassword().toCharArray());
-
- // Like ssh-agent.
- KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
- kmf.init(ks, cfg.getPassword().toCharArray());
-
- // Create the SSL context.
- ctx = SSLContext.getInstance("TLS");
- ctx.init(kmf.getKeyManagers(), getTrustManagers(), null);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- protected boolean isClient() {
- return false;
- }
-
-}