You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC
svn commit: r1165369 [5/9] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Mon Sep 5 17:38:57 2011
@@ -36,7 +36,7 @@ import org.apache.hedwig.util.Callback;
* the topic is completed, we need to restart delivery for that topic if that
* was the case before the original channel got disconnected. This async
* callback will be the hook for this.
- *
+ *
*/
public class SubscribeReconnectCallback implements Callback<Void> {
@@ -83,8 +83,8 @@ public class SubscribeReconnectCallback
// This exception should never be thrown here but just in case,
// log an error and just keep retrying the subscribe request.
logger.error("Subscribe was successful but error starting delivery for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8(), e);
+ + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8(), e);
retrySubscribeRequest();
}
}
@@ -108,6 +108,6 @@ public class SubscribeReconnectCallback
// Retry the subscribe request but only after waiting for a
// preconfigured amount of time.
client.getClientTimer().schedule(new SubscribeReconnectRetryTask(),
- client.getConfiguration().getSubscribeReconnectRetryWaitTime());
+ client.getConfiguration().getSubscribeReconnectRetryWaitTime());
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Mon Sep 5 17:38:57 2011
@@ -93,7 +93,7 @@ public class SubscribeResponseHandler {
if (logger.isDebugEnabled())
logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClient.getHostFromChannel(channel));
+ + HedwigClient.getHostFromChannel(channel));
switch (response.getStatusCode()) {
case SUCCESS:
// For successful Subscribe requests, store this Channel locally
@@ -122,7 +122,7 @@ public class SubscribeResponseHandler {
// because when that happens, things are slow already and piling
// up on the client app side to consume messages.
outstandingMsgSet = new HashSet<Message>(
- responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
+ responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f);
// Response was success so invoke the callback's operationFinished
// method.
pubSubData.callback.operationFinished(pubSubData.context, null);
@@ -131,14 +131,14 @@ public class SubscribeResponseHandler {
// For Subscribe requests, the server says that the client is
// already subscribed to it.
pubSubData.callback.operationFailed(pubSubData.context, new ClientAlreadySubscribedException(
- "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
- + pubSubData.subscriberId.toStringUtf8()));
+ "Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+ + pubSubData.subscriberId.toStringUtf8()));
break;
case SERVICE_DOWN:
// Response was service down failure so just invoke the callback's
// operationFailed method.
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
+ "Server responded with a SERVICE_DOWN status"));
break;
case NOT_RESPONSIBLE_FOR_TOPIC:
// Redirect response so we'll need to repost the original Subscribe
@@ -150,7 +150,7 @@ public class SubscribeResponseHandler {
// cases.
logger.error("Unexpected error response from server for PubSubResponse: " + response);
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " + response.getStatusCode()));
+ "Server responded with a status code of: " + response.getStatusCode()));
break;
}
}
@@ -160,7 +160,7 @@ public class SubscribeResponseHandler {
public void handleSubscribeMessage(PubSubResponse response) {
if (logger.isDebugEnabled())
logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+ + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
Message message = response.getMessage();
synchronized (this) {
@@ -182,8 +182,8 @@ public class SubscribeResponseHandler {
subscribeMsgQueue = new LinkedList<Message>();
if (logger.isDebugEnabled())
logger
- .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
- + message);
+ .debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: "
+ + message);
subscribeMsgQueue.add(message);
}
}
@@ -193,15 +193,15 @@ public class SubscribeResponseHandler {
* Method called when a message arrives for a subscribe Channel and we want
* to consume it asynchronously via the registered MessageHandler (should
* not be null when called here).
- *
+ *
* @param message
* Message from Subscribe Channel we want to consume.
*/
protected void asyncMessageConsume(Message message) {
if (logger.isDebugEnabled())
logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message
- + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
+ + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8());
// Add this "pending to be consumed" message to the outstandingMsgSet.
outstandingMsgSet.add(message);
// Check if we've exceeded the max size for the outstanding message set.
@@ -211,13 +211,13 @@ public class SubscribeResponseHandler {
// Channel to not be readable.
if (logger.isDebugEnabled())
logger.debug("Too many outstanding messages (" + outstandingMsgSet.size()
- + ") so throttling the subscribe netty Channel");
+ + ") so throttling the subscribe netty Channel");
subscribeChannel.setReadable(false);
}
MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
message);
messageHandler.consume(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
- .getConsumeCallback(), messageConsumeData);
+ .getConsumeCallback(), messageConsumeData);
}
/**
@@ -230,7 +230,7 @@ public class SubscribeResponseHandler {
* same order. To make this thread safe, since multiple outstanding messages
* could be consumed by the client app and then called back to here, make
* this method synchronized.
- *
+ *
* @param message
* Message sent from server for topic subscription that has been
* consumed by the client.
@@ -238,8 +238,8 @@ public class SubscribeResponseHandler {
protected synchronized void messageConsumed(Message message) {
if (logger.isDebugEnabled())
logger.debug("Message has been successfully consumed by the client app for message: " + message
- + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
+ + ", topic: " + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8());
// Update the consumed messages buffer variables
if (responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
// Update these variables only if we are auto-sending consume
@@ -265,8 +265,8 @@ public class SubscribeResponseHandler {
// subscribe request for the TopicSubscriber.
if (logger.isDebugEnabled())
logger
- .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: "
- + lastMessageSeqId);
+ .debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: "
+ + lastMessageSeqId);
responseHandler.getSubscriber().doConsume(origSubData, subscribeChannel, lastMessageSeqId);
numConsumedMessagesInBuffer = 0;
lastMessageSeqId = null;
@@ -279,10 +279,10 @@ public class SubscribeResponseHandler {
if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
if (logger.isDebugEnabled())
logger
- .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
- + origSubData.topic.toStringUtf8()
- + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
+ .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "
+ + origSubData.topic.toStringUtf8()
+ + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8());
subscribeChannel.setReadable(true);
}
}
@@ -291,14 +291,14 @@ public class SubscribeResponseHandler {
* Setter used for Subscribe flows when delivery for the subscription is
* started. This is used to register the MessageHandler needed to consumer
* the subscribed messages for the topic.
- *
+ *
* @param messageHandler
* MessageHandler to register for this ResponseHandler instance.
*/
public void setMessageHandler(MessageHandler messageHandler) {
if (logger.isDebugEnabled())
logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
- + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+ + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
synchronized (this) {
this.messageHandler = messageHandler;
// Once the MessageHandler is registered, see if we have any queued up
@@ -309,8 +309,8 @@ public class SubscribeResponseHandler {
if (messageHandler != null && subscribeMsgQueue != null && subscribeMsgQueue.size() > 0) {
if (logger.isDebugEnabled())
logger.debug("Consuming " + subscribeMsgQueue.size() + " queued up messages for topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: "
- + origSubData.subscriberId.toStringUtf8());
+ + origSubData.topic.toStringUtf8() + ", subscriberId: "
+ + origSubData.subscriberId.toStringUtf8());
for (Message message : subscribeMsgQueue) {
asyncMessageConsume(message);
}
@@ -323,7 +323,7 @@ public class SubscribeResponseHandler {
/**
* Getter for the MessageHandler that is set for this subscribe channel.
- *
+ *
* @return The MessageHandler for consuming messages
*/
public MessageHandler getMessageHandler() {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Mon Sep 5 17:38:57 2011
@@ -42,7 +42,7 @@ public class UnsubscribeResponseHandler
throws Exception {
if (logger.isDebugEnabled())
logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClient.getHostFromChannel(channel));
+ + HedwigClient.getHostFromChannel(channel));
switch (response.getStatusCode()) {
case SUCCESS:
// For successful Unsubscribe requests, we can now safely close the
@@ -56,14 +56,14 @@ public class UnsubscribeResponseHandler
// For Unsubscribe requests, the server says that the client was
// never subscribed to the topic.
pubSubData.callback.operationFailed(pubSubData.context, new ClientNotSubscribedException(
- "Client was never subscribed to topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
- + pubSubData.subscriberId.toStringUtf8()));
+ "Client was never subscribed to topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: "
+ + pubSubData.subscriberId.toStringUtf8()));
break;
case SERVICE_DOWN:
// Response was service down failure so just invoke the callback's
// operationFailed method.
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
+ "Server responded with a SERVICE_DOWN status"));
break;
case NOT_RESPONSIBLE_FOR_TOPIC:
// Redirect response so we'll need to repost the original
@@ -75,7 +75,7 @@ public class UnsubscribeResponseHandler
// cases.
logger.error("Unexpected error response from server for PubSubResponse: " + response);
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " + response.getStatusCode()));
+ "Server responded with a status code of: " + response.getStatusCode()));
break;
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java Mon Sep 5 17:38:57 2011
@@ -40,12 +40,12 @@ public class ClientChannelPipelineFactor
public ChannelPipeline getPipeline() throws Exception {
// Create a new ChannelPipline using the factory method from the
// Channels helper class.
- ChannelPipeline pipeline = Channels.pipeline();
+ ChannelPipeline pipeline = Channels.pipeline();
if (client.getSslFactory() != null) {
pipeline.addLast("ssl", new SslHandler(client.getSslFactory().getEngine()));
- }
+ }
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(client.getConfiguration()
- .getMaximumMessageSize(), 0, 4, 0, 4));
+ .getMaximumMessageSize(), 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubResponse.getDefaultInstance()));
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java Mon Sep 5 17:38:57 2011
@@ -70,7 +70,7 @@ public class ConnectCallback implements
// invoke the operationFailed callback.
logger.error("Error connecting to host more than once so just invoke the operationFailed callback!");
pubSubData.callback.operationFailed(pubSubData.context, new CouldNotConnectException(
- "Could not connect to host: " + host));
+ "Could not connect to host: " + host));
} else {
if (logger.isDebugEnabled())
logger.debug("Try to connect to server: " + host + " again for pubSubData: " + pubSubData);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java Mon Sep 5 17:38:57 2011
@@ -44,7 +44,7 @@ import org.apache.hedwig.exceptions.PubS
/**
* This is a top level Hedwig Client class that encapsulates the common
* functionality needed for both Publish and Subscribe operations.
- *
+ *
*/
public class HedwigClient {
@@ -182,7 +182,7 @@ public class HedwigClient {
}
private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
- long timeoutInterval) {
+ long timeoutInterval) {
if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
// Current PubSubRequest has timed out so remove it from the
// ResponseHandler's map and invoke the VoidCallback's
@@ -190,7 +190,7 @@ public class HedwigClient {
logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
responseHandler.txn2PubSubData.remove(pubSubData.txnId);
pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
- "Server ack response never received so PubSubRequest has timed out!"));
+ "Server ack response never received so PubSubRequest has timed out!"));
}
}
}
@@ -236,7 +236,7 @@ public class HedwigClient {
* to the one that is responsible for the topic. Once the connect to the
* server is done, we will perform the corresponding PubSub write on that
* channel.
- *
+ *
* @param pubSubData
* PubSub call's data wrapper object
* @param serverHost
@@ -261,7 +261,7 @@ public class HedwigClient {
* Helper method to store the topic2Host mapping in the HedwigClient cache
* map. This method is assumed to be called when we've done a successful
* connection to the correct server topic master.
- *
+ *
* @param pubSubData
* PubSub wrapper data
* @param channel
@@ -282,7 +282,7 @@ public class HedwigClient {
// Store the relevant mappings for this topic and host combination.
if (logger.isDebugEnabled())
logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
- + topic2Host.get(pubSubData.topic) + ", new host: " + host);
+ + topic2Host.get(pubSubData.topic) + ", new host: " + host);
topic2Host.put(pubSubData.topic, host);
if (host2Topics.containsKey(host)) {
host2Topics.get(host).add(pubSubData.topic);
@@ -297,7 +297,7 @@ public class HedwigClient {
* Helper static method to get the String Hostname:Port from a netty
* Channel. Assumption is that the netty Channel was originally created with
* an InetSocketAddress. This is true with the Hedwig netty implementation.
- *
+ *
* @param channel
* Netty channel to extract the hostname and port from.
* @return String representation of the Hostname:Port from the Netty Channel
@@ -310,7 +310,7 @@ public class HedwigClient {
* Helper static method to get the ResponseHandler instance from a Channel
* via the ChannelPipeline it is associated with. The assumption is that the
* last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
- *
+ *
* @param channel
* Channel we are retrieving the ResponseHandler instance for
* @return ResponseHandler Instance tied to the Channel's Pipeline
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Mon Sep 5 17:38:57 2011
@@ -42,7 +42,7 @@ import org.apache.hedwig.util.Callback;
/**
* This is the Hedwig Netty specific implementation of the Publisher interface.
- *
+ *
*/
public class HedwigPublisher implements Publisher {
@@ -143,7 +143,7 @@ public class HedwigPublisher implements
/**
* This is a helper method to write the actual publish message once the
* client is connected to the server and a Channel is available.
- *
+ *
* @param pubSubData
* Publish call's data wrapper object
* @param channel
@@ -183,7 +183,7 @@ public class HedwigPublisher implements
// Finally, write the Publish request through the Channel.
if (logger.isDebugEnabled())
logger.debug("Writing a Publish request to host: " + HedwigClient.getHostFromChannel(channel)
- + " for pubSubData: " + pubSubData);
+ + " for pubSubData: " + pubSubData);
ChannelFuture future = channel.write(pubsubRequestBuilder.build());
future.addListener(new WriteCallback(pubSubData, client));
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Mon Sep 5 17:38:57 2011
@@ -53,7 +53,7 @@ import org.apache.hedwig.util.Callback;
/**
* This is the Hedwig Netty specific implementation of the Subscriber interface.
- *
+ *
*/
public class HedwigSubscriber implements Subscriber {
@@ -79,12 +79,12 @@ public class HedwigSubscriber implements
// two flows are very similar. The assumption is that the input
// OperationType is either SUBSCRIBE or UNSUBSCRIBE.
private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
- CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
- ClientNotSubscribedException, ServiceDownException {
+ CreateOrAttach createOrAttach) throws CouldNotConnectException, ClientAlreadySubscribedException,
+ ClientNotSubscribedException, ServiceDownException {
if (logger.isDebugEnabled())
logger.debug("Calling a sync subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
- + createOrAttach);
+ + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+ + createOrAttach);
PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, null, null);
synchronized (pubSubData) {
PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
@@ -132,15 +132,15 @@ public class HedwigSubscriber implements
// flows are very similar. The assumption is that the input OperationType is
// either SUBSCRIBE or UNSUBSCRIBE.
private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context,
- OperationType operationType, CreateOrAttach createOrAttach) {
+ OperationType operationType, CreateOrAttach createOrAttach) {
if (logger.isDebugEnabled())
logger.debug("Calling an async subUnsub request for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
- + createOrAttach);
+ + subscriberId.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: "
+ + createOrAttach);
// Check if we know which server host is the master for the topic we are
// subscribing to.
PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, createOrAttach, callback,
- context);
+ context);
if (client.topic2Host.containsKey(topic)) {
InetSocketAddress host = client.topic2Host.get(topic);
if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
@@ -173,18 +173,18 @@ public class HedwigSubscriber implements
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
+ InvalidSubscriberIdException {
subscribe(topic, subscriberId, mode, false);
}
protected void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, boolean isHub)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
+ InvalidSubscriberIdException {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
if (!isValidSubscriberId(subscriberId, isHub)) {
throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
- + ", isHub: " + isHub);
+ + ", isHub: " + isHub);
}
try {
subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, mode);
@@ -198,35 +198,35 @@ public class HedwigSubscriber implements
}
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
- Object context) {
+ Object context) {
asyncSubscribe(topic, subscriberId, mode, callback, context, false);
}
protected void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode,
- Callback<Void> callback, Object context, boolean isHub) {
+ Callback<Void> callback, Object context, boolean isHub) {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
if (!isValidSubscriberId(subscriberId, isHub)) {
callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
- "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+ "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
return;
}
asyncSubUnsub(topic, subscriberId, callback, context, OperationType.SUBSCRIBE, mode);
}
public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+ ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
unsubscribe(topic, subscriberId, false);
}
protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
+ InvalidSubscriberIdException {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
if (!isValidSubscriberId(subscriberId, isHub)) {
throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
- + ", isHub: " + isHub);
+ + ", isHub: " + isHub);
}
// Synchronously close the subscription on the client side. Even
// if the unsubscribe request to the server errors out, we won't be
@@ -246,17 +246,17 @@ public class HedwigSubscriber implements
}
public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
- final Object context) {
+ final Object context) {
asyncUnsubscribe(topic, subscriberId, callback, context, false);
}
protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object context, boolean isHub) {
+ final Callback<Void> callback, final Object context, boolean isHub) {
// Validate that the format of the subscriberId is valid either as a
// local or hub subscriber.
if (!isValidSubscriberId(subscriberId, isHub)) {
callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
- "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
+ "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
return;
}
// Asynchronously close the subscription. On the callback to that
@@ -288,13 +288,13 @@ public class HedwigSubscriber implements
throws ClientNotSubscribedException {
if (logger.isDebugEnabled())
logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
+ + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
// Check that this topic subscription on the client side exists.
if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
throw new ClientNotSubscribedException(
- "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
+ "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8());
}
PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
// Send the consume message to the server using the same subscribe
@@ -305,7 +305,7 @@ public class HedwigSubscriber implements
/**
* This is a helper method to write the actual subscribe/unsubscribe message
* once the client is connected to the server and a Channel is available.
- *
+ *
* @param pubSubData
* Subscribe/Unsubscribe call's data wrapper object. We assume
* that the operationType field is either SUBSCRIBE or
@@ -360,7 +360,7 @@ public class HedwigSubscriber implements
// Finally, write the Subscribe request through the Channel.
if (logger.isDebugEnabled())
logger.debug("Writing a SubUnsub request to host: " + HedwigClient.getHostFromChannel(channel)
- + " for pubSubData: " + pubSubData);
+ + " for pubSubData: " + pubSubData);
ChannelFuture future = channel.write(pubsubRequestBuilder.build());
future.addListener(new WriteCallback(pubSubData, client));
}
@@ -369,7 +369,7 @@ public class HedwigSubscriber implements
* This is a helper method to write a consume message to the server after a
* subscribe Channel connection is made to the server and messages are being
* consumed by the client.
- *
+ *
* @param pubSubData
* Consume call's data wrapper object. We assume that the
* operationType field is CONSUME.
@@ -405,14 +405,14 @@ public class HedwigSubscriber implements
// message if there was a problem writing the consume request.
if (logger.isDebugEnabled())
logger.debug("Writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
- + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+ + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
ChannelFuture future = channel.write(pubsubRequestBuilder.build());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.error("Error writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
- + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
+ + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
}
}
});
@@ -420,7 +420,7 @@ public class HedwigSubscriber implements
}
public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException {
+ ServiceDownException {
// The subscription type of info should be stored on the server end, not
// the client side. Eventually, the server will have the Subscription
// Manager part that ties into Zookeeper to manage this info.
@@ -433,7 +433,7 @@ public class HedwigSubscriber implements
}
public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException {
+ ServiceDownException {
// Same as the previous hasSubscription method, this data should reside
// on the server end, not the client side.
return null;
@@ -443,7 +443,7 @@ public class HedwigSubscriber implements
throws ClientNotSubscribedException {
if (logger.isDebugEnabled())
logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ + subscriberId.toStringUtf8());
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
// Make sure we know about this topic subscription on the client side
// exists. The assumption is that the client should have in memory the
@@ -451,16 +451,16 @@ public class HedwigSubscriber implements
// an ack response to the initial subscribe request.
if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ + subscriberId.toStringUtf8());
throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
+ + ", subscriberId: " + subscriberId.toStringUtf8());
}
// Register the MessageHandler with the subscribe Channel's
// Response Handler.
Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .setMessageHandler(messageHandler);
+ .setMessageHandler(messageHandler);
// Now make the TopicSubscriber Channel readable (it is set to not be
// readable when the initial subscription is done). Note that this is an
// asynchronous call. If this fails (not likely), the futureListener
@@ -471,7 +471,7 @@ public class HedwigSubscriber implements
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
}
}
});
@@ -480,7 +480,7 @@ public class HedwigSubscriber implements
public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws ClientNotSubscribedException {
if (logger.isDebugEnabled())
logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ + subscriberId.toStringUtf8());
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
// Make sure we know that this topic subscription on the client side
// exists. The assumption is that the client should have in memory the
@@ -488,16 +488,16 @@ public class HedwigSubscriber implements
// an ack response to the initial subscribe request.
if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ + subscriberId.toStringUtf8());
throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
+ + ", subscriberId: " + subscriberId.toStringUtf8());
}
// Unregister the MessageHandler for the subscribe Channel's
// Response Handler.
Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .setMessageHandler(null);
+ .setMessageHandler(null);
// Now make the TopicSubscriber channel not-readable. This will buffer
// up messages if any are sent from the server. Note that this is an
// asynchronous call. If this fails (not likely), the futureListener
@@ -508,7 +508,7 @@ public class HedwigSubscriber implements
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
}
}
});
@@ -528,16 +528,16 @@ public class HedwigSubscriber implements
// Check from the PubSubCallback if it was successful or not.
if (!pubSubCallback.getIsCallSuccessful()) {
throw new ServiceDownException("Exception while trying to close the subscription for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
}
}
}
public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object context) {
+ final Callback<Void> callback, final Object context) {
if (logger.isDebugEnabled())
logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
- + subscriberId.toStringUtf8());
+ + subscriberId.toStringUtf8());
TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
// Remove all cached references for the TopicSubscriber
@@ -551,10 +551,10 @@ public class HedwigSubscriber implements
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8());
+ + ", subscriberId: " + subscriberId.toStringUtf8());
callback.operationFailed(context, new ServiceDownException(
- "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8()));
+ "Failed to close the subscription channel for topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8()));
} else {
callback.operationFinished(context, null);
}
@@ -562,7 +562,7 @@ public class HedwigSubscriber implements
});
} else {
logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: "
- + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
+ + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
callback.operationFinished(context, null);
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Mon Sep 5 17:38:57 2011
@@ -113,7 +113,7 @@ public class ResponseHandler extends Sim
PubSubResponse response = (PubSubResponse) e.getMessage();
if (logger.isDebugEnabled())
logger.debug("Response received from host: " + HedwigClient.getHostFromChannel(ctx.getChannel())
- + ", response: " + response);
+ + ", response: " + response);
// Determine if this PubSubResponse is an ack response for a PubSub
// Request or if it is a message being pushed to the client subscriber.
@@ -127,7 +127,7 @@ public class ResponseHandler extends Sim
// Response is an ack to a prior PubSubRequest so first retrieve the
// PubSub data for this txn.
PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
- .getTxnId()) : null;
+ .getTxnId()) : null;
// Validate that the PubSub data for this txn is stored. If not, just
// log an error message and return since we don't know how to handle
// this.
@@ -163,14 +163,14 @@ public class ResponseHandler extends Sim
// The above are the only expected PubSubResponse messages received
// from the server for the various client side requests made.
logger.error("Response received from server is for an unhandled operation type, txnId: "
- + response.getTxnId() + ", operationType: " + pubSubData.operationType);
+ + response.getTxnId() + ", operationType: " + pubSubData.operationType);
}
}
/**
* Logic to repost a PubSubRequest when the server responds with a redirect
* indicating they are not the topic master.
- *
+ *
* @param response
* PubSubResponse from the server for the redirect
* @param pubSubData
@@ -185,7 +185,7 @@ public class ResponseHandler extends Sim
throws Exception {
if (logger.isDebugEnabled())
logger.debug("Handling a redirect from host: " + HedwigClient.getHostFromChannel(channel) + ", response: "
- + response + ", pubSubData: " + pubSubData);
+ + response + ", pubSubData: " + pubSubData);
// In this case, the PubSub request was done to a server that is not
// responsible for the topic. First make sure that we haven't
// exceeded the maximum number of server redirects.
@@ -197,8 +197,8 @@ public class ResponseHandler extends Sim
if (logger.isDebugEnabled())
logger.debug("Exceeded the number of server redirects (" + curNumServerRedirects + ") so error out.");
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- new TooManyServerRedirectsException("Already reached max number of redirects: "
- + curNumServerRedirects)));
+ new TooManyServerRedirectsException("Already reached max number of redirects: "
+ + curNumServerRedirects)));
return;
}
@@ -206,7 +206,7 @@ public class ResponseHandler extends Sim
// stored in the StatusMsg of the response. First store the
// server that we sent the PubSub request to for the topic.
ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClient
- .getHostFromChannel(channel)));
+ .getHostFromChannel(channel)));
if (pubSubData.triedServers == null)
pubSubData.triedServers = new LinkedList<ByteString>();
pubSubData.shouldClaim = true;
@@ -232,10 +232,10 @@ public class ResponseHandler extends Sim
// already before in this PubSub request.
if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
logger.error("We've already sent this PubSubRequest before to redirectedHost: " + redirectedHost
- + ", pubSubData: " + pubSubData);
+ + ", pubSubData: " + pubSubData);
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- new ServerRedirectLoopException("Already made the request before to redirected host: "
- + redirectedHost)));
+ new ServerRedirectLoopException("Already made the request before to redirected host: "
+ + redirectedHost)));
return;
}
@@ -297,7 +297,7 @@ public class ResponseHandler extends Sim
if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
if (logger.isDebugEnabled())
logger.debug("Disconnected channel for host: " + host
- + " was for Publish/Unsubscribe requests so remove all references to it.");
+ + " was for Publish/Unsubscribe requests so remove all references to it.");
pub.host2Channel.remove(host);
client.clearAllTopicsForHost(host);
}
@@ -333,9 +333,9 @@ public class ResponseHandler extends Sim
for (PubSubData pubSubData : txn2PubSubData.values()) {
if (logger.isDebugEnabled())
logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: "
- + pubSubData);
+ + pubSubData);
pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
- "Server ack response never received before server connection disconnected!"));
+ "Server ack response never received before server connection disconnected!"));
}
txn2PubSubData.clear();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java Mon Sep 5 17:38:57 2011
@@ -52,7 +52,7 @@ public class WriteCallback implements Ch
// with any callback logic here.
if (client.hasStopped())
return;
-
+
// When the write operation to the server is done, we just need to check
// if it was successful or not.
InetSocketAddress host = HedwigClient.getHostFromChannel(future.getChannel());
@@ -73,11 +73,11 @@ public class WriteCallback implements Ch
// failed, so invoke the operationFailed callback.
logger.error("Error writing to host more than once so just invoke the operationFailed callback!");
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Error while writing message to server: " + hostString));
+ "Error while writing message to server: " + hostString));
} else {
if (logger.isDebugEnabled())
logger.debug("Try to send the PubSubRequest again to the default server host/VIP for pubSubData: "
- + pubSubData);
+ + pubSubData);
// Keep track of this current server that we failed to write to
// but retry the request on the default server host/VIP.
if (pubSubData.writeFailedServers == null)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java Mon Sep 5 17:38:57 2011
@@ -43,23 +43,24 @@ public abstract class SslContextFactory
protected TrustManager[] getTrustManagers() {
return new TrustManager[] { new X509TrustManager() {
- // Always trust, even if invalid.
+ // Always trust, even if invalid.
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return new X509Certificate[0];
- }
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
- @Override
- public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // Always trust.
- }
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // Always trust.
+ }
- @Override
- public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
- // Always trust.
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // Always trust.
+ }
}
- } };
+ };
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java Mon Sep 5 17:38:57 2011
@@ -34,7 +34,7 @@ public abstract class AbstractConfigurat
/**
* You can load configurations in precedence order. The first one takes
* precedence over any loaded later.
- *
+ *
* @param confURL
*/
public void loadConf(URL confURL) throws ConfigurationException {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java Mon Sep 5 17:38:57 2011
@@ -21,13 +21,13 @@ import org.apache.hedwig.exceptions.PubS
/**
* This class is used for callbacks for asynchronous operations
- *
+ *
*/
public interface Callback<T> {
/**
* This method is called when the asynchronous operation finishes
- *
+ *
* @param ctx
* @param resultOfOperation
*/
@@ -36,7 +36,7 @@ public interface Callback<T> {
/**
* This method is called when the operation failed due to some reason. The
* reason for failure is passed in.
- *
+ *
* @param ctx
* The context for the callback
* @param exception
@@ -44,4 +44,4 @@ public interface Callback<T> {
*/
public abstract void operationFailed(Object ctx, PubSubException exception);
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java Mon Sep 5 17:38:57 2011
@@ -31,10 +31,10 @@ public class CallbackUtils {
/**
* A callback that waits for all of a number of events to fire. If any fail,
* then fail the final callback with a composite exception.
- *
+ *
* TODO: change this to use any Exception and make CompositeException
* generic, not a PubSubException.
- *
+ *
* @param expected
* Number of expected callbacks.
* @param cb
@@ -135,7 +135,7 @@ public class CallbackUtils {
* Logs what happened before continuing the callback chain.
*/
public static <T> Callback<T> logger(final Logger logger, final Level successLevel, final Level failureLevel, final Object successMsg,
- final Object failureMsg, final Callback<T> cont) {
+ final Object failureMsg, final Callback<T> cont) {
return new Callback<T>() {
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java Mon Sep 5 17:38:57 2011
@@ -27,7 +27,7 @@ import junit.framework.TestSuite;
public class AppTest extends TestCase {
/**
* Create the test case
- *
+ *
* @param testName
* name of the test case
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java Mon Sep 5 17:38:57 2011
@@ -31,7 +31,7 @@ public class TestHedwigSocketAddress ext
private int sslPort = 9876;
private int invalidPort = -9999;
private String COLON = ":";
-
+
@Test
public void testCreateWithSSLPort() throws Exception {
HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port, sslPort);
@@ -51,7 +51,7 @@ public class TestHedwigSocketAddress ext
HedwigSocketAddress addr = new HedwigSocketAddress(hostname+COLON+port+COLON+sslPort);
assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port)));
assertTrue(addr.getSSLSocketAddress().equals(new InetSocketAddress(hostname, sslPort)));
- }
+ }
@Test
public void testCreateFromStringWithNoSSLPort() throws Exception {
@@ -59,7 +59,7 @@ public class TestHedwigSocketAddress ext
assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port)));
assertTrue(addr.getSSLSocketAddress() == null);
}
-
+
@Test
public void testCreateWithInvalidRegularPort() throws Exception {
boolean success = false;
@@ -70,7 +70,7 @@ public class TestHedwigSocketAddress ext
success = true;
}
assertTrue(success);
- }
+ }
@Test
public void testCreateWithInvalidSSLPort() throws Exception {
@@ -82,7 +82,7 @@ public class TestHedwigSocketAddress ext
success = true;
}
assertTrue(success);
- }
+ }
@Test
public void testToStringConversion() throws Exception {
@@ -98,7 +98,7 @@ public class TestHedwigSocketAddress ext
HedwigSocketAddress sslAddr = new HedwigSocketAddress(hostname, port, sslPort);
assertTrue(sslAddr.isSSLEnabled());
HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port);
- assertFalse(addr.isSSLEnabled());
+ assertFalse(addr.isSSLEnabled());
}
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java Mon Sep 5 17:38:57 2011
@@ -135,7 +135,7 @@ public abstract class PubSubException ex
super(StatusCode.UNEXPECTED_CONDITION, msg);
}
}
-
+
// The composite exception (for concurrent operations).
public static class CompositeException extends PubSubException {
private final Collection<PubSubException> exceptions;