You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC
svn commit: r1165369 [8/9] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManagerWithRangeScan.java Mon Sep 5 17:38:57 2011
@@ -20,7 +20,7 @@ package org.apache.hedwig.server.persist
public interface PersistenceManagerWithRangeScan extends PersistenceManager {
/**
* Executes the given range scan request
- *
+ *
* @param request
*/
public void scanMessages(RangeScanRequest request);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/RangeScanRequest.java Mon Sep 5 17:38:57 2011
@@ -30,7 +30,7 @@ import com.google.protobuf.ByteString;
* {@link ScanCallback} used should be prepared to deal with more or less
* messages scanned. If an error occurs during scanning, the
* {@link ScanCallback} is notified of the error.
- *
+ *
*/
public class RangeScanRequest {
ByteString topic;
@@ -41,7 +41,7 @@ public class RangeScanRequest {
Object ctx;
public RangeScanRequest(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, ScanCallback callback,
- Object ctx) {
+ Object ctx) {
this.topic = topic;
this.startSeqId = startSeqId;
this.messageLimit = messageLimit;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Mon Sep 5 17:38:57 2011
@@ -107,7 +107,7 @@ public class ReadAheadCache implements P
/**
* Constructor. Starts the cache maintainer thread
- *
+ *
* @param realPersistenceManager
*/
public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration cfg) {
@@ -139,10 +139,10 @@ public class ReadAheadCache implements P
* ========================================================================
* Other methods of {@link PersistenceManager} that the cache needs to take
* some action on.
- *
+ *
* 1. Persist: We pass it through to the real persistence manager but insert
* our callback on the return path
- *
+ *
*/
public void persistMessage(PersistRequest request) {
// make a new PersistRequest object so that we can insert our own
@@ -158,7 +158,7 @@ public class ReadAheadCache implements P
* The callback that we insert on the persist request return path. The
* callback simply forms a {@link PersistResponse} object and inserts it in
* the request queue to be handled serially by the cache maintainer thread.
- *
+ *
*/
public class PersistCallback implements Callback<Long> {
@@ -188,7 +188,7 @@ public class ReadAheadCache implements P
// Original message that was persisted didn't have the local seq-id.
// Lets add that in
Message messageWithLocalSeqId = MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(),
- resultOfOperation);
+ resultOfOperation);
// Now enqueue a request to add this newly persisted message to our
// cache
@@ -204,20 +204,20 @@ public class ReadAheadCache implements P
* callbacks. Its just simpler to quit and restart afresh. Moreover, this
* should not happen as the request queue for the cache maintainer is
* unbounded.
- *
+ *
* @param obj
*/
protected void enqueueWithoutFailure(CacheRequest obj) {
if (!requestQueue.offer(obj)) {
throw new UnexpectedError("Could not enqueue object: " + obj.toString()
- + " to cache request queue. Exiting.");
+ + " to cache request queue. Exiting.");
}
}
/**
* Another method from {@link PersistenceManager}.
- *
+ *
* 2. Scan - Since the scan needs to touch the cache, we will just enqueue
* the scan request and let the cache maintainer thread handle it.
*/
@@ -228,7 +228,7 @@ public class ReadAheadCache implements P
/**
* Another method from {@link PersistenceManager}.
- *
+ *
* 3. Enqueue the request so that the cache maintainer thread can delete all
* message-ids older than the one specified
*/
@@ -238,7 +238,7 @@ public class ReadAheadCache implements P
/**
* Another method from {@link PersistenceManager}.
- *
+ *
* Since this is a cache layer on top of an underlying persistence manager,
* we can just call the consumedUntil method there. The messages older than
* the latest one passed here won't be accessed anymore so they should just
@@ -252,7 +252,7 @@ public class ReadAheadCache implements P
/**
* ========================================================================
* BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
- *
+ *
* 1. The run method. It simply dequeues from the request queue, checks the
* type of object and acts accordingly
*/
@@ -284,7 +284,7 @@ public class ReadAheadCache implements P
* outstanding. In that case, we look a little ahead (by readAheadCount/2)
* and issue a range read of readAheadCount/2 messages. The idea is to
* ensure that the next readAheadCount messages are always available.
- *
+ *
* @return the range scan that should be issued for read ahead
*/
protected RangeScanRequest doReadAhead(ScanRequest request) {
@@ -313,7 +313,7 @@ public class ReadAheadCache implements P
/**
* This method just checks if the provided seq-id already exists in the
* cache. If not, a range read of the specified amount is issued.
- *
+ *
* @param topic
* @param seqId
* @param readAheadCount
@@ -369,7 +369,7 @@ public class ReadAheadCache implements P
/**
* Constructor
- *
+ *
* @param installedStubs
* The list of stubs that were installed for this range scan
* @param topic
@@ -407,8 +407,8 @@ public class ReadAheadCache implements P
// should remove them, so that whoever is waiting on them can retry.
// This shouldn't be happening usually
logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: "
- + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
- + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
+ + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
+ + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance);
}
@@ -457,14 +457,14 @@ public class ReadAheadCache implements P
* For adding the message to the cache, we do some bookeeping such as the
* total size of cache, order in which entries were added etc. If the size
* of the cache has exceeded our budget, old entries are collected.
- *
+ *
* @param cacheKey
* @param message
*/
protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
if (logger.isDebugEnabled()) {
logger.debug("Adding msg (topic: " + cacheKey.getTopic().toStringUtf8() + ", seq-id: "
- + message.getMsgId().getLocalComponent() + ") to readahead cache");
+ + message.getMsgId().getLocalComponent() + ") to readahead cache");
}
CacheValue cacheValue;
@@ -482,7 +482,7 @@ public class ReadAheadCache implements P
// maintain the index of seq-id
MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
- TreeSetLongFactory.instance);
+ TreeSetLongFactory.instance);
// finally add the message to the cache
cacheValue.setMessageAndInvokeCallbacks(message, currTime);
@@ -492,7 +492,7 @@ public class ReadAheadCache implements P
}
protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
- boolean maintainSeqIdIndex) {
+ boolean maintainSeqIdIndex) {
CacheValue cacheValue = cache.remove(cacheKey);
if (cacheValue == null) {
@@ -537,13 +537,13 @@ public class ReadAheadCache implements P
if (logger.isDebugEnabled()) {
logger.debug("Removing topic: " + cacheKey.getTopic() + "seq-id: " + cacheKey.getSeqId()
- + " from cache because its the oldest");
+ + " from cache because its the oldest");
}
removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
- // maintainTimeIndex=
- false,
- // maintainSeqIdIndex=
- true);
+ // maintainTimeIndex=
+ false,
+ // maintainSeqIdIndex=
+ true);
}
timeIndexOfAddition.remove(earliestTime);
@@ -554,7 +554,7 @@ public class ReadAheadCache implements P
/**
* ========================================================================
* The rest is just simple wrapper classes.
- *
+ *
*/
protected class ExceptionOnCacheKey implements CacheRequest {
@@ -575,10 +575,10 @@ public class ReadAheadCache implements P
*/
public void performRequest() {
removeMessageFromCache(cacheKey, exception,
- // maintainTimeIndex=
- true,
- // maintainSeqIdIndex=
- true);
+ // maintainTimeIndex=
+ true,
+ // maintainSeqIdIndex=
+ true);
}
}
@@ -638,15 +638,15 @@ public class ReadAheadCache implements P
if (logger.isDebugEnabled()) {
logger.debug("Removing seq-id: " + cacheKey.getSeqId() + " topic: "
- + cacheKey.getTopic().toStringUtf8()
- + " from cache because every subscriber has moved past");
+ + cacheKey.getTopic().toStringUtf8()
+ + " from cache because every subscriber has moved past");
}
removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
- // maintainTimeIndex=
- true,
- // maintainSeqIdIndex=
- false);
+ // maintainTimeIndex=
+ true,
+ // maintainSeqIdIndex=
+ false);
iter.remove();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java Mon Sep 5 17:38:57 2011
@@ -30,7 +30,7 @@ public interface ScanCallback {
* as part of a scan. The message just read is handed to this listener which
* can then take the desired action on it. The return value from the method
* indicates whether the scan should continue or not.
- *
+ *
* @param ctx
* The context for the callback
* @param message
@@ -41,8 +41,8 @@ public interface ScanCallback {
/**
* This method is called when the scan finishes
- *
- *
+ *
+ *
* @param ctx
* @param reason
*/
@@ -52,7 +52,7 @@ public interface ScanCallback {
/**
* This method is called when the operation failed due to some reason. The
* reason for failure is passed in.
- *
+ *
* @param ctx
* The context for the callback
* @param exception
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java Mon Sep 5 17:38:57 2011
@@ -30,7 +30,7 @@ import org.apache.hedwig.protocol.PubSub
* its redundant.
* {@link ScanCallback#scanFailed(Object, org.apache.hedwig.exceptions.PubSubException)}
* method is called in case of error.
- *
+ *
*/
public class ScanRequest {
ByteString topic;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java Mon Sep 5 17:38:57 2011
@@ -72,7 +72,7 @@ public class ChannelTracker implements C
if (topicSub2Channel.containsKey(topicSubscriber)) {
TopicBusyException pse = new PubSubException.TopicBusyException(
- "subscription for this topic, subscriberId is already being served on a different channel");
+ "subscription for this topic, subscriberId is already being served on a different channel");
throw pse;
}
@@ -107,12 +107,12 @@ public class ChannelTracker implements C
if (subscribedChannel == null) {
throw new PubSubException.ClientNotSubscribedException(
- "Can't start delivery since client is not subscribed");
+ "Can't start delivery since client is not subscribed");
}
if (subscribedChannel != channel) {
throw new PubSubException.TopicBusyException(
- "Can't start delivery since client is subscribed on a different channel");
+ "Can't start delivery since client is subscribed on a different channel");
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Mon Sep 5 17:38:57 2011
@@ -48,7 +48,7 @@ public class HedwigProxy {
HedwigClient client;
ServerSocketChannelFactory serverSocketChannelFactory;
- ChannelGroup allChannels;
+ ChannelGroup allChannels;
Map<OperationType, Handler> handlers;
ProxyConfiguration cfg;
@@ -69,7 +69,7 @@ public class HedwigProxy {
@Override
public void run() {
client = new HedwigClient(cfg);
-
+
serverSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
initializeHandlers();
@@ -125,7 +125,7 @@ public class HedwigProxy {
// the following method only exists for unit-testing purposes, should go
// away once we make start delivery totally server-side
- public Handler getStartDeliveryHandler(){
+ public Handler getStartDeliveryHandler() {
return handlers.get(OperationType.START_DELIVERY);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Mon Sep 5 17:38:57 2011
@@ -19,18 +19,18 @@ package org.apache.hedwig.server.proxy;
import org.apache.hedwig.client.conf.ClientConfiguration;
-public class ProxyConfiguration extends ClientConfiguration{
+public class ProxyConfiguration extends ClientConfiguration {
protected static String PROXY_PORT = "proxy_port";
protected static String MAX_MESSAGE_SIZE = "max_message_size";
-
- public int getProxyPort(){
+
+ public int getProxyPort() {
return conf.getInt(PROXY_PORT, 9099);
}
-
+
@Override
public int getMaximumMessageSize() {
return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
}
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Mon Sep 5 17:38:57 2011
@@ -71,22 +71,22 @@ public class ProxyStartDeliveryHandler i
// }
final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
-
+
if (subscribedChannel == null) {
channel.write(PubSubResponseUtils.getResponseForException(
- new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
- request.getTxnId()));
+ new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
+ request.getTxnId()));
return;
}
-
+
MessageHandler handler = new MessageHandler() {
@Override
public void consume(ByteString topic, ByteString subscriberId, Message msg,
- final Callback<Void> callback, final Object context) {
+ final Callback<Void> callback, final Object context) {
PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
- ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
- .setTopic(topic).setSubscriberId(subscriberId).build();
+ ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
+ .setTopic(topic).setSubscriberId(subscriberId).build();
ChannelFuture future = subscribedChannel.write(response);
@@ -119,7 +119,7 @@ public class ProxyStartDeliveryHandler i
logger.fatal("Unexpected: No subscription when attempting to start delivery", e);
throw new RuntimeException(e);
}
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java Mon Sep 5 17:38:57 2011
@@ -60,7 +60,7 @@ public class ProxySubscribeHandler imple
final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(), subRequest
- .getCreateOrAttach(), new Callback<Void>() {
+ .getCreateOrAttach(), new Callback<Void>() {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java Mon Sep 5 17:38:57 2011
@@ -38,7 +38,7 @@ public class HedwigHubClientFactory {
/**
* Manufacture a hub client whose default server to connect to is the input
* HedwigSocketAddress hub.
- *
+ *
* @param hub
* The hub in another region to connect to.
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Mon Sep 5 17:38:57 2011
@@ -45,25 +45,25 @@ public class HedwigHubSubscriber extends
@Override
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException {
+ InvalidSubscriberIdException {
subscribe(topic, subscriberId, mode, true);
}
@Override
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
- Object context) {
+ Object context) {
asyncSubscribe(topic, subscriberId, mode, callback, context, true);
}
@Override
public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+ ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
unsubscribe(topic, subscriberId, true);
}
@Override
public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
- final Object context) {
+ final Object context) {
asyncUnsubscribe(topic, subscriberId, callback, context, true);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Mon Sep 5 17:38:57 2011
@@ -52,7 +52,7 @@ public class RegionManager implements Su
private final TopicOpQueuer queue;
public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
- ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
+ ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
this.pm = pm;
mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
queue = new TopicOpQueuer(scheduler);
@@ -70,7 +70,7 @@ public class RegionManager implements Su
@Override
public void run() {
Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER, Level.DEBUG, Level.ERROR,
- "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
+ "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
for (final HedwigHubClient client : clients) {
final HedwigSubscriber sub = client.getSubscriber();
@@ -83,23 +83,23 @@ public class RegionManager implements Su
sub.startDelivery(topic, mySubId, new MessageHandler() {
@Override
public void consume(final ByteString topic, ByteString subscriberId, Message msg,
- final Callback<Void> callback, final Object context) {
+ final Callback<Void> callback, final Object context) {
// When messages are first published
// locally, the PublishHandler sets the
// source region in the Message.
if (msg.hasSrcRegion()) {
Message.newBuilder(msg).setMsgId(
- MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
- RegionSpecificSeqId.newBuilder().setRegion(
- msg.getSrcRegion()).setSeqId(
- msg.getMsgId().getLocalComponent())));
+ MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
+ RegionSpecificSeqId.newBuilder().setRegion(
+ msg.getSrcRegion()).setSeqId(
+ msg.getMsgId().getLocalComponent())));
}
pm.persistMessage(new PersistRequest(topic, msg, new Callback<Long>() {
@Override
public void operationFinished(Object ctx, Long resultOfOperation) {
if (LOGGER.isDebugEnabled())
LOGGER.debug("cross-region recv-fwd succeeded for topic "
- + topic.toStringUtf8());
+ + topic.toStringUtf8());
callback.operationFinished(context, null);
}
@@ -107,7 +107,7 @@ public class RegionManager implements Su
public void operationFailed(Object ctx, PubSubException exception) {
if (LOGGER.isDebugEnabled())
LOGGER.error("cross-region recv-fwd failed for topic "
- + topic.toStringUtf8(), exception);
+ + topic.toStringUtf8(), exception);
callback.operationFailed(context, exception);
}
}, null));
@@ -115,12 +115,12 @@ public class RegionManager implements Su
});
if (LOGGER.isDebugEnabled())
LOGGER.debug("cross-region start-delivery succeeded for topic "
- + topic.toStringUtf8());
+ + topic.toStringUtf8());
mcb.operationFinished(ctx, null);
} catch (PubSubException ex) {
if (LOGGER.isDebugEnabled())
LOGGER.error(
- "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
+ "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
mcb.operationFailed(ctx, ex);
}
}
@@ -129,7 +129,7 @@ public class RegionManager implements Su
public void operationFailed(Object ctx, PubSubException exception) {
if (LOGGER.isDebugEnabled())
LOGGER.error("cross-region subscribe failed for topic " + topic.toStringUtf8(),
- exception);
+ exception);
mcb.operationFailed(ctx, exception);
}
}, null);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -66,7 +66,7 @@ public abstract class AbstractSubscripti
private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
- ScheduledExecutorService scheduler) {
+ ScheduledExecutorService scheduler) {
this.cfg = cfg;
queuer = new TopicOpQueuer(scheduler);
tm.addTopicOwnershipChangeListener(this);
@@ -141,7 +141,7 @@ public abstract class AbstractSubscripti
@Override
public void operationFinished(final Object ctx,
- final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
+ final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
// We've just inherited a bunch of subscriber for this
// topic, some of which may be local. If they are, then we
// need to (1) notify listeners of this and (2) record the
@@ -163,7 +163,7 @@ public abstract class AbstractSubscripti
@Override
public void operationFailed(Object ctx, PubSubException exception) {
logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
- exception);
+ exception);
cb.operationFailed(ctx, null);
}
@@ -201,7 +201,7 @@ public abstract class AbstractSubscripti
* Figure out who is subscribed. Do nothing if already acquired. If there's
* an error reading the subscribers' sequence IDs, then the topic is not
* acquired.
- *
+ *
* @param topic
* @param callback
* @param ctx
@@ -236,7 +236,7 @@ public abstract class AbstractSubscripti
MessageSeqId consumeSeqId;
public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<MessageSeqId> callback, Object ctx) {
+ Callback<MessageSeqId> callback, Object ctx) {
queuer.super(topic, callback, ctx);
this.subRequest = subRequest;
this.consumeSeqId = consumeSeqId;
@@ -259,8 +259,8 @@ public abstract class AbstractSubscripti
if (createOrAttach.equals(CreateOrAttach.CREATE)) {
String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " requested creating a subscription but it is already subscribed with state: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
+ + " requested creating a subscription but it is already subscribed with state: "
+ + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
logger.debug(msg);
cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
return;
@@ -269,8 +269,8 @@ public abstract class AbstractSubscripti
// otherwise just attach
if (logger.isDebugEnabled()) {
logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " attaching to subscription with state: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
+ + " attaching to subscription with state: "
+ + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
}
cb.operationFinished(ctx, subscriptionState.getLastConsumeSeqId());
@@ -280,7 +280,7 @@ public abstract class AbstractSubscripti
// we don't have a mapping for this subscriber
if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " requested attaching to an existing subscription but it is not subscribed";
+ + " requested attaching to an existing subscription but it is not subscribed";
logger.debug(msg);
cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
return;
@@ -301,7 +301,7 @@ public abstract class AbstractSubscripti
@Override
public void operationFailed(Object ctx, PubSubException exception) {
logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
- + topic.toStringUtf8() + " failed due to failed listener callback", exception);
+ + topic.toStringUtf8() + " failed due to failed listener callback", exception);
cb.operationFailed(ctx, exception);
}
@@ -325,7 +325,7 @@ public abstract class AbstractSubscripti
@Override
public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<MessageSeqId> callback, Object ctx) {
+ Callback<MessageSeqId> callback, Object ctx) {
queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
}
@@ -334,7 +334,7 @@ public abstract class AbstractSubscripti
MessageSeqId consumeSeqId;
public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
- Object ctx) {
+ Object ctx) {
queuer.super(topic, callback, ctx);
this.subscriberId = subscriberId;
this.consumeSeqId = consumeSeqId;
@@ -359,10 +359,10 @@ public abstract class AbstractSubscripti
} else {
if (logger.isDebugEnabled()) {
logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
- + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
- + " in-memory consume-id: "
- + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
+ + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
+ + " in-memory consume-id: "
+ + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
}
cb.operationFinished(ctx, null);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -36,21 +36,21 @@ public class InMemorySubscriptionManager
@Override
protected void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
- Callback<Void> callback, Object ctx) {
+ Callback<Void> callback, Object ctx) {
// nothing to do, in-memory info is already recorded by base class
callback.operationFinished(ctx, null);
}
@Override
protected void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
- Object ctx) {
+ Object ctx) {
// nothing to do, in-memory info is already deleted by base class
callback.operationFinished(ctx, null);
}
@Override
protected void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
- Callback<Void> callback, Object ctx) {
+ Callback<Void> callback, Object ctx) {
// nothing to do, in-memory info is already updated by base class
callback.operationFinished(ctx, null);
}
@@ -62,7 +62,7 @@ public class InMemorySubscriptionManager
@Override
protected void readSubscriptions(ByteString topic,
- Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
+ Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
// Since we don't lose in-memory information on lostTopic, we can just
// return that back
Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Mon Sep 5 17:38:57 2011
@@ -42,7 +42,7 @@ public class InMemorySubscriptionState {
}
/**
- *
+ *
* @param lastConsumeSeqId
* @param consumeInterval
* The amount of laziness we want in persisting the consume
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java Mon Sep 5 17:38:57 2011
@@ -23,7 +23,7 @@ public interface MessageFilter {
/**
* Tests whether a particular message passes the filter or not
- *
+ *
* @param message
* @return
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java Mon Sep 5 17:38:57 2011
@@ -22,7 +22,7 @@ import org.apache.hedwig.util.Callback;
/**
* For listening to events that are issued by a SubscriptionManager.
- *
+ *
*/
public interface SubscriptionEventListener {
@@ -30,7 +30,7 @@ public interface SubscriptionEventListen
* Called by the subscription manager when it previously had zero local
* subscribers for a topic and is currently accepting its first local
* subscriber.
- *
+ *
* @param topic
* The topic of interest.
* @param synchronous
@@ -48,10 +48,10 @@ public interface SubscriptionEventListen
* Called by the SubscriptionManager when it previously had non-zero local
* subscribers for a topic and is currently dropping its last local
* subscriber. This is fully asynchronous so there is no callback.
- *
+ *
* @param topic
* The topic of interest.
*/
public void onLastLocalUnsubscribe(ByteString topic);
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -28,15 +28,15 @@ import org.apache.hedwig.util.Callback;
public interface SubscriptionManager {
/**
- *
+ *
* Register a new subscription for the given subscriber for the given topic.
* This method should reliably persist the existence of the subscription in
* a way that it can't be lost. If the subscription already exists,
* depending on the create or attach flag in the subscribe request, an
* exception may be returned.
- *
+ *
* This is an asynchronous method.
- *
+ *
* @param topic
* @param subRequest
* @param consumeSeqId
@@ -48,14 +48,14 @@ public interface SubscriptionManager {
* @param ctx
*/
public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<MessageSeqId> callback, Object ctx);
+ Callback<MessageSeqId> callback, Object ctx);
/**
* Set the consume position of a given subscriber on a given topic. Note
* that this method need not persist the consume position immediately but
* can be lazy and persist it later asynchronously, if that is more
* efficient.
- *
+ *
* @param topic
* @param subscriberId
* @param consumeSeqId
@@ -65,7 +65,7 @@ public interface SubscriptionManager {
/**
* Delete a particular subscription
- *
+ *
* @param topic
* @param subscriberId
*/
@@ -100,5 +100,5 @@ public interface SubscriptionManager {
* or removed.
*/
public void addListener(SubscriptionEventListener listener);
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java Mon Sep 5 17:38:57 2011
@@ -50,7 +50,7 @@ public class ZkSubscriptionManager exten
protected final static Logger logger = Logger.getLogger(ZkSubscriptionManager.class);
public ZkSubscriptionManager(ZooKeeper zk, TopicManager topicMgr, PersistenceManager pm, ServerConfiguration cfg,
- ScheduledExecutorService scheduler) {
+ ScheduledExecutorService scheduler) {
super(cfg, topicMgr, pm, scheduler);
this.zk = zk;
}
@@ -61,12 +61,12 @@ public class ZkSubscriptionManager exten
private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
- .toString();
+ .toString();
}
@Override
protected void readSubscriptions(final ByteString topic,
- final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+ final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
@@ -75,7 +75,7 @@ public class ZkSubscriptionManager exten
if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
- + topic.toStringUtf8(), path, rc);
+ + topic.toStringUtf8(), path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
return;
}
@@ -104,8 +104,8 @@ public class ZkSubscriptionManager exten
if (rc != Code.OK.intValue()) {
KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not read subscription data for topic: " + topic.toStringUtf8()
- + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+ "Could not read subscription data for topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
reportFailure(new PubSubException.ServiceDownException(e));
return;
}
@@ -120,7 +120,7 @@ public class ZkSubscriptionManager exten
state = SubscriptionState.parseFrom(data);
} catch (InvalidProtocolBufferException ex) {
String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8();
+ + " subscriberId: " + subscriberId.toStringUtf8();
logger.error(msg, ex);
reportFailure(new PubSubException.UnexpectedConditionException(msg));
return;
@@ -128,8 +128,8 @@ public class ZkSubscriptionManager exten
if (logger.isDebugEnabled()) {
logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
- + " subscriberId: " + child + "state: "
- + SubscriptionStateUtils.toString(state));
+ + " subscriberId: " + child + "state: "
+ + SubscriptionStateUtils.toString(state));
}
topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
@@ -151,65 +151,65 @@ public class ZkSubscriptionManager exten
@Override
protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
- final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+ final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
- + SubscriptionStateUtils.toString(state));
- }
- callback.operationFinished(ctx, null);
- } else {
- KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not record new subscription for topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
- }
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+ + SubscriptionStateUtils.toString(state));
}
- }, ctx);
+ callback.operationFinished(ctx, null);
+ } else {
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not record new subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ }
+ }
+ }, ctx);
}
@Override
protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
- final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+ final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
- new SafeAsyncZKCallback.StatCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
- if (rc != Code.OK.intValue()) {
- KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8()
- + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
- path, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
- + SubscriptionStateUtils.toString(state));
- }
-
- callback.operationFinished(ctx, null);
- }
+ new SafeAsyncZKCallback.StatCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc != Code.OK.intValue()) {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8()
+ + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
+ path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+ + SubscriptionStateUtils.toString(state));
}
- }, ctx);
+
+ callback.operationFinished(ctx, null);
+ }
+ }
+ }, ctx);
}
@Override
protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
- final Callback<Void> callback, final Object ctx) {
+ final Callback<Void> callback, final Object ctx) {
zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx) {
if (rc == Code.OK.intValue()) {
if (logger.isDebugEnabled()) {
logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8());
+ + " subscriberId: " + subscriberId.toStringUtf8());
}
callback.operationFinished(ctx, null);
@@ -217,7 +217,7 @@ public class ZkSubscriptionManager exten
}
KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
- + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
+ + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
}
}, ctx);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Mon Sep 5 17:38:57 2011
@@ -60,8 +60,8 @@ public abstract class AbstractTopicManag
private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
public boolean shouldClaim;
- public GetOwnerOp(final ByteString topic, boolean shouldClaim,
- final Callback<HedwigSocketAddress> cb, Object ctx) {
+ public GetOwnerOp(final ByteString topic, boolean shouldClaim,
+ final Callback<HedwigSocketAddress> cb, Object ctx) {
queuer.super(topic, cb, ctx);
this.shouldClaim = shouldClaim;
}
@@ -119,7 +119,7 @@ public abstract class AbstractTopicManag
@Override
public void operationFailed(Object ctx, PubSubException exception) {
logger.error("failure that should never happen when periodically releasing topic "
- + topic, exception);
+ + topic, exception);
}
@Override
@@ -157,7 +157,7 @@ public abstract class AbstractTopicManag
@Override
public final void getOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
+ Callback<HedwigSocketAddress> cb, Object ctx) {
queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
}
@@ -173,10 +173,10 @@ public abstract class AbstractTopicManag
* choosing this hub as the owner, the {@code
* AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
* OperationCallback, Object)} method must be called.
- *
+ *
*/
protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx);
+ Callback<HedwigSocketAddress> cb, Object ctx);
/**
* The method should do any cleanup necessary to indicate to other hubs that
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Mon Sep 5 17:38:57 2011
@@ -30,13 +30,13 @@ import org.apache.hedwig.util.HedwigSock
* claim responsibilities for the topics that were at the failed host. On
* claiming responsibility for a topic, a host should call its
* {@link TopicOwnershipChangeListener}.
- *
+ *
*/
public interface TopicManager {
/**
* Get the name of the host responsible for the given topic.
- *
+ *
* @param topic
* The topic whose owner to get.
* @param cb
@@ -45,8 +45,8 @@ public interface TopicManager {
* @throws ServiceDownException
* If there is an error looking up the information
*/
- public void getOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx);
+ public void getOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx);
/**
* Whenever the topic manager finds out that the set of topics owned by this
@@ -59,14 +59,14 @@ public interface TopicManager {
* immediately to such notifications, and with no blocking (because multiple
* listeners might need to be informed and they are all informed by the same
* thread).
- *
+ *
* @param listener
*/
public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
/**
* Give up ownership of a topic. If I don't own it, do nothing.
- *
+ *
* @throws ServiceDownException
* If there is an error in claiming responsibility for the topic
*/
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Mon Sep 5 17:38:57 2011
@@ -33,8 +33,8 @@ public class TrivialOwnAllTopicManager e
}
@Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
if (topics.contains(topic)) {
cb.operationFinished(ctx, addr);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Mon Sep 5 17:38:57 2011
@@ -47,13 +47,13 @@ import org.apache.hedwig.zookeeper.SafeA
/**
* Topics are operated on in parallel as they are independent.
- *
+ *
*/
public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
static Logger logger = Logger.getLogger(ZkTopicManager.class);
Random rand = new Random();
-
+
/**
* Persistent storage for topic metadata.
*/
@@ -75,7 +75,7 @@ public class ZkTopicManager extends Abst
/**
* Create a new topic manager. Pass in an active ZooKeeper client object.
- *
+ *
* @param zk
*/
public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler)
@@ -93,18 +93,18 @@ public class ZkTopicManager extends Abst
logger.warn("ZK client has been disconnected to the ZK server!");
isSuspended = true;
} else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
- if (isSuspended) {
- logger.info("ZK client has been reconnected to the ZK server!");
- }
- isSuspended = false;
+ if (isSuspended) {
+ logger.info("ZK client has been reconnected to the ZK server!");
+ }
+ isSuspended = false;
}
- }
- // Check for expired connection.
+ }
+ // Check for expired connection.
if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
logger.error("ZK client connection to the ZK server has expired!");
System.exit(1);
- }
- }
+ }
+ }
});
final SynchronousQueue<Either<Void, PubSubException>> queue = new SynchronousQueue<Either<Void, PubSubException>>();
@@ -132,41 +132,41 @@ public class ZkTopicManager extends Abst
void registerWithZookeeper(final Callback<Void> callback, Object ctx) {
ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, getCurrentLoadData(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+ CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue()) {
+ callback.operationFinished(ctx, null);
+ return;
+ }
+ if (rc != Code.NODEEXISTS.intValue()) {
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ return;
+ }
+
+ logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
+ // Node exists, lets try to delete it and retry
+ zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
@Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue()) {
- callback.operationFinished(ctx, null);
- return;
- }
- if (rc != Code.NODEEXISTS.intValue()) {
- KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ public void safeProcessResult(int rc, String path, Object ctx) {
+ if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
+ registerWithZookeeper(callback, ctx);
return;
}
-
- logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
-
- // Node exists, lets try to delete it and retry
- zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx) {
- if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
- registerWithZookeeper(callback, ctx);
- return;
- }
- KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
- return;
-
- }
- }, ctx);
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ return;
}
- }, null);
+ }, ctx);
+
+ }
+ }, null);
}
String hubPath(ByteString topic) {
@@ -175,12 +175,12 @@ public class ZkTopicManager extends Abst
@Override
protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
- final Callback<HedwigSocketAddress> cb, final Object ctx) {
+ final Callback<HedwigSocketAddress> cb, final Object ctx) {
// If operations are suspended due to a ZK client disconnect, just error
// out this call and return.
if (isSuspended) {
cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "ZKTopicManager service is temporarily suspended!"));
+ "ZKTopicManager service is temporarily suspended!"));
return;
}
@@ -217,7 +217,7 @@ public class ZkTopicManager extends Abst
public void safeProcessResult(int rc, String path, Object ctx, List<String> children) {
if (rc != Code.OK.intValue()) {
KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not get list of available hubs", path, rc);
+ "Could not get list of available hubs", path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
return;
}
@@ -238,8 +238,8 @@ public class ZkTopicManager extends Abst
if (rc == KeeperException.Code.OK.intValue()) {
try {
int load = Integer.parseInt(new String(data));
- if (logger.isDebugEnabled()){
- logger.debug("Found server: " + ctx + " with load: " + load);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found server: " + ctx + " with load: " + load);
}
if (load < minLoad || (load == minLoad && rand.nextBoolean())) {
minLoad = load;
@@ -256,7 +256,7 @@ public class ZkTopicManager extends Abst
if (numResponses == children.size()) {
if (leastLoaded == null) {
cb.operationFailed(ZkGetOwnerOp.this.ctx, new PubSubException.ServiceDownException(
- "No hub available"));
+ "No hub available"));
return;
}
HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
@@ -273,7 +273,7 @@ public class ZkTopicManager extends Abst
for (String child : children) {
zk.getData(cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
- dataCallback, child);
+ dataCallback, child);
}
}
@@ -296,7 +296,7 @@ public class ZkTopicManager extends Abst
if (rc != Code.OK.intValue()) {
KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
- + topic.toStringUtf8(), path, rc);
+ + topic.toStringUtf8(), path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
return;
}
@@ -324,7 +324,7 @@ public class ZkTopicManager extends Abst
claimOrChoose();
} else {
KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
+ "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
}
}
@@ -339,26 +339,26 @@ public class ZkTopicManager extends Abst
}
ZkUtils.createFullPathOptimistic(zk, hubPath, addr.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+ CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue()) {
- if (logger.isDebugEnabled()) {
- logger.debug("claimed topic: " + topic.toStringUtf8());
- }
- notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
- updateLoadInformation();
- } else if (rc == Code.NODEEXISTS.intValue()) {
- read();
- } else {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Failed to create ephemeral node to claim ownership of topic: "
- + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- }
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("claimed topic: " + topic.toStringUtf8());
}
- }, ctx);
+ notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+ updateLoadInformation();
+ } else if (rc == Code.NODEEXISTS.intValue()) {
+ read();
+ } else {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException(
+ "Failed to create ephemeral node to claim ownership of topic: "
+ + topic.toStringUtf8(), path, rc);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ }
+ }
+ }, ctx);
}
}
@@ -370,10 +370,10 @@ public class ZkTopicManager extends Abst
}
void updateLoadInformation() {
- byte[] currentLoad = getCurrentLoadData();
- if (logger.isDebugEnabled()){
- logger.debug("Reporting load of " + new String(currentLoad));
- }
+ byte[] currentLoad = getCurrentLoadData();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reporting load of " + new String(currentLoad));
+ }
zk.setData(ephemeralNodePath, currentLoad, -1, loadReportingStatCallback, null);
}
@@ -393,7 +393,7 @@ public class ZkTopicManager extends Abst
if (rc != Code.OK.intValue()) {
KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
+ "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
return;
}
@@ -401,7 +401,7 @@ public class ZkTopicManager extends Abst
HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
if (!owner.equals(addr)) {
logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
- + owner + " found, leaving untouched");
+ + owner + " found, leaving untouched");
// Not our node, someone else's, leave it alone
cb.operationFinished(ctx, null);
return;
@@ -412,8 +412,8 @@ public class ZkTopicManager extends Abst
public void safeProcessResult(int rc, String path, Object ctx) {
if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
KeeperException e = ZkUtils
- .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
- + topic.toStringUtf8(), path, rc);
+ .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
+ + topic.toStringUtf8(), path, rc);
cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
return;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java Mon Sep 5 17:38:57 2011
@@ -24,81 +24,81 @@ import org.apache.bookkeeper.client.Ledg
import org.apache.bookkeeper.client.LedgerHandle;
-public class SafeAsynBKCallback extends SafeAsyncCallback{
+public class SafeAsynBKCallback extends SafeAsyncCallback {
public static abstract class OpenCallback implements AsyncCallback.OpenCallback {
@Override
public void openComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
- try{
+ try {
safeOpenComplete(rc, ledgerHandle, ctx);
- }catch(Throwable t){
+ } catch(Throwable t) {
invokeUncaughtExceptionHandler(t);
}
}
-
+
public abstract void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx);
}
-
+
public static abstract class CloseCallback implements AsyncCallback.CloseCallback {
@Override
- public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx){
- try{
+ public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+ try {
safeCloseComplete(rc, ledgerHandle, ctx);
- }catch(Throwable t){
+ } catch(Throwable t) {
invokeUncaughtExceptionHandler(t);
}
}
-
+
public abstract void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) ;
}
-
+
public static abstract class ReadCallback implements AsyncCallback.ReadCallback {
-
+
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
- try{
+ try {
safeReadComplete(rc, lh, seq, ctx);
- }catch(Throwable t){
+ } catch(Throwable t) {
invokeUncaughtExceptionHandler(t);
}
-
+
}
-
+
public abstract void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
}
-
+
public static abstract class CreateCallback implements AsyncCallback.CreateCallback {
-
+
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
- try{
+ try {
safeCreateComplete(rc, lh, ctx);
- }catch(Throwable t){
+ } catch(Throwable t) {
invokeUncaughtExceptionHandler(t);
}
-
+
}
-
+
public abstract void safeCreateComplete(int rc, LedgerHandle lh, Object ctx);
-
-
+
+
}
-
+
public static abstract class AddCallback implements AsyncCallback.AddCallback {
-
+
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
- try{
+ try {
safeAddComplete(rc, lh, entryId, ctx);
- }catch(Throwable t){
+ } catch(Throwable t) {
invokeUncaughtExceptionHandler(t);
}
}
-
+
public abstract void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
-
+
}
-
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java Mon Sep 5 17:38:57 2011
@@ -23,7 +23,7 @@ import org.apache.zookeeper.AsyncCallbac
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-public class SafeAsyncZKCallback extends SafeAsyncCallback{
+public class SafeAsyncZKCallback extends SafeAsyncCallback {
public static abstract class StatCallback implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
try {