You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/14 12:29:04 UTC
svn commit: r1300510 [3/3] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/
hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/
hedwig-client/src/main/cpp/test/ hedwig-client/src/ma...
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Mar 14 11:29:03 2012
@@ -96,6 +96,12 @@ message SubscribeRequest{
// wait for cross-regional subscriptions to be established before returning
optional bool synchronous = 4 [default = false];
+ optional uint32 messageBound = 5;
+}
+
+message SubscriptionOptions {
+ optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
+ optional uint32 messageBound = 3 [default = 0];
}
message ConsumeRequest{
@@ -162,6 +168,7 @@ enum StatusCode{
message SubscriptionState {
required MessageSeqId msgId = 1;
+ optional uint32 messageBound = 2;
}
message LedgerRange{
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -20,6 +20,8 @@ package org.apache.hedwig.server.persist
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +122,7 @@ public class BookkeeperPersistenceManage
* include the current ledger
*/
TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
+ int ledgerRangesZnodeVersion = -1;
/**
* This is the handle of the current ledger that is being used to write
@@ -131,6 +134,9 @@ public class BookkeeperPersistenceManage
* Flag to release topic when encountering unrecoverable exceptions
*/
AtomicBoolean doRelease = new AtomicBoolean(false);
+
+ final static int UNLIMITED = 0;
+ int messageBound = UNLIMITED;
}
Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
@@ -308,29 +314,123 @@ public class BookkeeperPersistenceManage
// Nothing to do here. this is just a hint that we cannot use.
}
+ class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ private long ledgerDeleted;
+
+ public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx, final long ledgerDeleted) {
+ queuer.super(topic, cb, ctx);
+ this.ledgerDeleted = ledgerDeleted;
+ }
+
+ @Override
+ public void run() {
+ final TopicInfo topicInfo = topicInfos.get(topic);
+ if (topicInfo == null) {
+ logger.error("Server is not responsible for topic!");
+ return;
+ }
+ boolean needsUpdate = false;
+ LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+ final Set<Long> keysToRemove = new HashSet<Long>();
+ for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
+ if (e.getValue().range.getLedgerId() == ledgerDeleted) {
+ needsUpdate = true;
+ keysToRemove.add(e.getKey());
+ } else {
+ builder.addRanges(e.getValue().range);
+ }
+ }
+ builder.addRanges(topicInfo.currentLedgerRange.range);
+
+ if (needsUpdate) {
+ final LedgerRanges newRanges = builder.build();
+ updateLedgerRangesNode(topic, newRanges, topicInfo.ledgerRangesZnodeVersion,
+ new Callback<Integer>() {
+ public void operationFinished(Object ctx, Integer newVersion) {
+ // Finally, all done
+ for (Long k : keysToRemove) {
+ topicInfo.ledgerRanges.remove(k);
+ }
+ topicInfo.ledgerRangesZnodeVersion = newVersion;
+ cb.operationFinished(ctx, null);
+ }
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ }
+ }, ctx);
+ } else {
+ cb.operationFinished(ctx, null);
+ }
+ }
+ }
+
+ class ConsumeUntilOp extends TopicOpQueuer.SynchronousOp {
+ private final long seqId;
+
+ public ConsumeUntilOp(ByteString topic, long seqId) {
+ queuer.super(topic);
+ this.seqId = seqId;
+ }
+
+ @Override
+ public void runInternal() {
+ TopicInfo topicInfo = topicInfos.get(topic);
+ if (topicInfo == null) {
+ logger.error("Server is not responsible for topic!");
+ return;
+ }
+
+ for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
+ if (endSeqIdIncluded <= seqId) {
+ // This ledger's message entries have all been consumed already
+ // so it is safe to delete it from BookKeeper.
+ long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
+ try {
+ bk.deleteLedger(ledgerId);
+ Callback<Void> cb = new Callback<Void>() {
+ public void operationFinished(Object ctx, Void result) {
+ // do nothing, op is async to stop other ops
+ // occurring on the topic during the update
+ }
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("Failed to update ledger znode", exception);
+ }
+ };
+ queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgerId));
+ } catch (Exception e) {
+ // For now, just log an exception error message. In the
+ // future, we can have more complicated retry logic to
+ // delete a consumed ledger. The next time the ledger
+ // garbage collection job runs, we'll once again try to
+ // delete this ledger.
+ logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
+ }
+ } else
+ break;
+ }
+ }
+ }
+
public void consumedUntil(ByteString topic, Long seqId) {
+ queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, Math.max(seqId, getMinSeqIdForTopic(topic))));
+ }
+
+ public void consumeToBound(ByteString topic) {
TopicInfo topicInfo = topicInfos.get(topic);
- if (topicInfo == null) {
- logger.error("Server is not responsible for topic!");
+
+ if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
return;
}
- for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
- if (endSeqIdIncluded <= seqId) {
- // This ledger's message entries have all been consumed already
- // so it is safe to delete it from BookKeeper.
- long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
- try {
- bk.deleteLedger(ledgerId);
- } catch (Exception e) {
- // For now, just log an exception error message. In the
- // future, we can have more complicated retry logic to
- // delete a consumed ledger. The next time the ledger
- // garbage collection job runs, we'll once again try to
- // delete this ledger.
- logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
- }
- } else
- break;
+ queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, getMinSeqIdForTopic(topic)));
+ }
+
+ public long getMinSeqIdForTopic(ByteString topic) {
+ TopicInfo topicInfo = topicInfos.get(topic);
+
+ if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
+ return Long.MIN_VALUE;
+ } else {
+ return (topicInfo.lastSeqIdPushed.getLocalComponent() - topicInfo.messageBound) + 1;
}
}
@@ -345,7 +445,7 @@ public class BookkeeperPersistenceManage
}
public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
- return seqId + skipAmount;
+ return Math.max(seqId + skipAmount, getMinSeqIdForTopic(topic));
}
public class PersistOp extends TopicOpQueuer.SynchronousOp {
@@ -615,7 +715,7 @@ public class BookkeeperPersistenceManage
LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
.setEndSeqIdIncluded(lastMessage.getMsgId()).build();
topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
- new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+ new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
+ topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
@@ -673,33 +773,40 @@ public class BookkeeperPersistenceManage
}
builder.addRanges(lastRange);
- writeTopicLedgersNode(topic, builder.build().toByteArray(), expectedVersionOfLedgersNode,
- topicInfo);
+ updateLedgerRangesNode(topic, builder.build(), expectedVersionOfLedgersNode,
+ new Callback<Integer>() {
+ public void operationFinished(Object ctx, Integer newVersion) {
+ // Finally, all done
+ topicInfo.ledgerRangesZnodeVersion = newVersion;
+ topicInfos.put(topic, topicInfo);
+ cb.operationFinished(ctx, null);
+ }
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ }
+ }, ctx);
return;
}
}, ctx);
}
+ }
- void writeTopicLedgersNode(final ByteString topic, byte[] data, int expectedVersion, final TopicInfo topicInfo) {
- final String zNodePath = ledgersPath(topic);
+ public void updateLedgerRangesNode(final ByteString topic, LedgerRanges ranges,
+ int version, final Callback<Integer> callback, Object ctx) {
+ final String zNodePath = ledgersPath(topic);
- zk.setData(zNodePath, data, expectedVersion, new SafeAsyncZKCallback.StatCallback() {
+ zk.setData(zNodePath, ranges.toByteArray(), version, new SafeAsyncZKCallback.StatCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
if (rc != KeeperException.Code.OK.intValue()) {
KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
return;
}
-
- // Finally, all done
- topicInfos.put(topic, topicInfo);
- cb.operationFinished(ctx, null);
+ callback.operationFinished(ctx, stat.getVersion());
}
}, ctx);
-
- }
}
/**
@@ -761,4 +868,28 @@ public class BookkeeperPersistenceManage
queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
}
+ class SetMessageBoundOp extends TopicOpQueuer.SynchronousOp {
+ final int bound;
+
+ public SetMessageBoundOp(ByteString topic, int bound) {
+ queuer.super(topic);
+ this.bound = bound;
+ }
+
+ @Override
+ public void runInternal() {
+ TopicInfo topicInfo = topicInfos.get(topic);
+ if (topicInfo != null) {
+ topicInfo.messageBound = bound;
+ }
+ }
+ }
+
+ public void setMessageBound(ByteString topic, Integer bound) {
+ queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, bound));
+ }
+
+ public void clearMessageBound(ByteString topic) {
+ queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, TopicInfo.UNLIMITED));
+ }
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -398,6 +398,18 @@ public class LocalDBPersistenceManager i
}
}
+ public void setMessageBound(ByteString topic, Integer bound) {
+ // noop; Maybe implement later
+ }
+
+ public void clearMessageBound(ByteString topic) {
+ // noop; Maybe implement later
+ }
+
+ public void consumeToBound(ByteString topic) {
+ // noop; Maybe implement later
+ }
+
@Override
protected void finalize() throws Throwable {
if (driver.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -88,4 +88,7 @@ public interface PersistenceManager {
*/
public void consumedUntil(ByteString topic, Long seqId);
+ public void setMessageBound(ByteString topic, Integer bound);
+ public void clearMessageBound(ByteString topic);
+ public void consumeToBound(ByteString topic);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Wed Mar 14 11:29:03 2012
@@ -250,6 +250,18 @@ public class ReadAheadCache implements P
realPersistenceManager.consumedUntil(topic, seqId);
}
+ public void setMessageBound(ByteString topic, Integer bound) {
+ realPersistenceManager.setMessageBound(topic, bound);
+ }
+
+ public void clearMessageBound(ByteString topic) {
+ realPersistenceManager.clearMessageBound(topic);
+ }
+
+ public void consumeToBound(ByteString topic) {
+ realPersistenceManager.consumeToBound(topic);
+ }
+
/**
* ========================================================================
* BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Wed Mar 14 11:29:03 2012
@@ -26,6 +26,7 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;
/**
@@ -46,13 +47,28 @@ public class HedwigHubSubscriber extends
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
InvalidSubscriberIdException {
- subscribe(topic, subscriberId, mode, true);
+ SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+ subscribe(topic, subscriberId, options);
}
@Override
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
Object context) {
- asyncSubscribe(topic, subscriberId, mode, callback, context, true);
+ SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
+ asyncSubscribe(topic, subscriberId, options, callback, context);
+ }
+
+ @Override
+ public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ subscribe(topic, subscriberId, options, true);
+ }
+
+ @Override
+ public void asyncSubscribe(ByteString topic, ByteString subscriberId,
+ SubscriptionOptions options, Callback<Void> callback, Object context) {
+ asyncSubscribe(topic, subscriberId, options, callback, context, true);
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Wed Mar 14 11:29:03 2012
@@ -111,6 +111,7 @@ public abstract class AbstractSubscripti
for (ByteString topic : top2sub2seq.keySet()) {
final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
long minConsumedMessage = Long.MAX_VALUE;
+ boolean hasBound = true;
// Loop through all subscribers to the current topic to find the
// minimum consumed message id. The consume pointers are
// persisted lazily so we'll use the stale in-memory value
@@ -119,20 +120,20 @@ public abstract class AbstractSubscripti
for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
if (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < minConsumedMessage)
minConsumedMessage = curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+ hasBound = hasBound && curSubscription.getSubscriptionState().hasMessageBound();
}
boolean callPersistenceManager = true;
// Don't call the PersistenceManager if nobody is subscribed to
// the topic yet, or the consume pointer has not changed since
// the last time, or if this is the initial subscription.
if (topicSubscriptions.isEmpty()
- || (topic2MinConsumedMessagesMap.containsKey(topic) && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
- || minConsumedMessage == 0) {
- callPersistenceManager = false;
- }
- // Pass the new consume pointers to the PersistenceManager.
- if (callPersistenceManager) {
+ || (topic2MinConsumedMessagesMap.containsKey(topic)
+ && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+ || minConsumedMessage == 0) {
topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
pm.consumedUntil(topic, minConsumedMessage);
+ } else if (hasBound) {
+ pm.consumeToBound(topic);
}
}
}
@@ -198,6 +199,8 @@ public abstract class AbstractSubscripti
} else {
cb2.operationFinished(ctx, null);
}
+
+ updateMessageBound(topic);
}
}, ctx);
@@ -360,7 +363,12 @@ public abstract class AbstractSubscripti
}
// now the hard case, this is a brand new subscription, must record
- final SubscriptionState newState = SubscriptionState.newBuilder().setMsgId(consumeSeqId).build();
+ SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder().setMsgId(consumeSeqId);
+ if (subRequest.hasMessageBound()) {
+ stateBuilder = stateBuilder.setMessageBound(subRequest.getMessageBound());
+ }
+ final SubscriptionState newState = stateBuilder.build();
+
createSubscriptionState(topic, subscriberId, newState, new Callback<Void>() {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
@@ -406,6 +414,9 @@ public abstract class AbstractSubscripti
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(newState));
+
+ updateMessageBound(topic);
+
cb.operationFinished(ctx, consumeSeqId);
}
@@ -421,6 +432,27 @@ public abstract class AbstractSubscripti
}
}
+ public void updateMessageBound(ByteString topic) {
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+ if (topicSubscriptions == null) {
+ return;
+ }
+ int maxBound = Integer.MIN_VALUE;
+ for (Map.Entry<ByteString, InMemorySubscriptionState> e : topicSubscriptions.entrySet()) {
+ if (!e.getValue().getSubscriptionState().hasMessageBound()) {
+ maxBound = Integer.MIN_VALUE;
+ break;
+ } else {
+ maxBound = Math.max(maxBound, e.getValue().getSubscriptionState().getMessageBound());
+ }
+ }
+ if (maxBound == Integer.MIN_VALUE) {
+ pm.clearMessageBound(topic);
+ } else {
+ pm.setMessageBound(topic, maxBound);
+ }
+ }
+
@Override
public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
Callback<MessageSeqId> callback, Object ctx) {
@@ -508,6 +540,8 @@ public abstract class AbstractSubscripti
if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
&& topic2LocalCounts.get(topic).decrementAndGet() == 0)
notifyUnsubcribe(topic);
+
+ updateMessageBound(topic);
cb.operationFinished(ctx, null);
}
}, ctx);
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java?rev=1300510&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java Wed Mar 14 11:29:03 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.MessageHandler;
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+
+import org.apache.hedwig.client.api.Client;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.util.Callback;
+
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+
+public class MessageBoundedPersistenceTest extends HedwigHubTestBase {
+ protected static Logger logger = LoggerFactory.getLogger(MessageBoundedPersistenceTest.class);
+
+ protected class SmallReadAheadServerConfiguration
+ extends HedwigHubTestBase.HubServerConfiguration {
+ SmallReadAheadServerConfiguration(int serverPort, int sslServerPort) {
+ super(serverPort, sslServerPort);
+ }
+ public long getMaximumCacheSize() {
+ return 1;
+ }
+
+ public int getReadAheadCount() {
+ return 1;
+ }
+
+ public int getMessagesConsumedThreadRunInterval() {
+ return 1000; // run every second
+ }
+ }
+
+ protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
+ return new SmallReadAheadServerConfiguration(serverPort, sslServerPort);
+ }
+
+ private class MessageBoundClientConfiguration extends ClientConfiguration {
+ final int messageBound;
+
+ public MessageBoundClientConfiguration(int bound) {
+ this.messageBound = bound;
+ }
+
+ public MessageBoundClientConfiguration() {
+ this(5);
+ }
+
+ public int getSubscriptionMessageBound() {
+ return messageBound;
+ }
+ }
+
+ private void sendXExpectLastY(Publisher pub, Subscriber sub,
+ ByteString topic, ByteString subid,
+ final int X, final int Y) throws Exception {
+ for (int i = 0; i < X; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+ sub.subscribe(topic, subid, CreateOrAttach.ATTACH);
+
+ final AtomicInteger expected = new AtomicInteger(X - Y);
+ final CountDownLatch latch = new CountDownLatch(1);
+ sub.startDelivery(topic, subid, new MessageHandler () {
+ synchronized public void deliver(ByteString topic, ByteString subscriberId,
+ Message msg, Callback<Void> callback,
+ Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+
+ if (value == expected.get()) {
+ expected.incrementAndGet();
+ } else {
+ // error condition
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ latch.countDown();
+ }
+ if (expected.get() == X) {
+ latch.countDown();
+ }
+ callback.operationFinished(context, null);
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ latch.countDown();// will error on match
+ }
+ }
+ });
+ assertTrue("Timed out waiting for messages Y is " + Y
+ + " expected is currently " + expected.get(), latch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + X, X, expected.get());
+
+ sub.stopDelivery(topic, subid);
+ sub.closeSubscription(topic, subid);
+ }
+
+ @Test
+ public void testBasicBounding() throws Exception {
+ Client client = new HedwigClient(new MessageBoundClientConfiguration(5));
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ ByteString topic = ByteString.copyFromUtf8("basicBoundingTopic");
+ ByteString subid = ByteString.copyFromUtf8("basicBoundingSubId");
+ sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+ sub.closeSubscription(topic, subid);
+
+ sendXExpectLastY(pub, sub, topic, subid, 1000, 5);
+
+ client.close();
+ }
+
+ @Test
+ public void testMultipleSubscribers() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("multiSubTopic");
+
+ Client client = new HedwigClient(new ClientConfiguration());
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build();
+ SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(20).build();
+ SubscriptionOptions optionsUnbounded = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+
+ ByteString subid5 = ByteString.copyFromUtf8("bound5SubId");
+ ByteString subid20 = ByteString.copyFromUtf8("bound20SubId");
+ ByteString subidUnbounded = ByteString.copyFromUtf8("noboundSubId");
+
+ sub.subscribe(topic, subid5, options5);
+ sub.closeSubscription(topic, subid5);
+ sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
+
+ sub.subscribe(topic, subid20, options20);
+ sub.closeSubscription(topic, subid20);
+ sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
+
+ sub.subscribe(topic, subidUnbounded, optionsUnbounded);
+ sub.closeSubscription(topic, subidUnbounded);
+
+ sendXExpectLastY(pub, sub, topic, subidUnbounded, 10000, 10000);
+ sub.unsubscribe(topic, subidUnbounded);
+
+ sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
+ sub.unsubscribe(topic, subid20);
+
+ sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
+ sub.unsubscribe(topic, subid5);
+
+ client.close();
+ }
+
+ @Test
+ public void testLedgerGC() throws Exception {
+ Client client = new HedwigClient(new MessageBoundClientConfiguration());
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ String ledgersPath = "/hedwig/standalone/topics/testGCTopic/ledgers";
+ ByteString topic = ByteString.copyFromUtf8("testGCTopic");
+ ByteString subid = ByteString.copyFromUtf8("testGCSubId");
+ sub.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ sub.closeSubscription(topic, subid);
+
+ for (int i = 1; i <= 100; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+ LedgerRanges r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+ assertEquals("Should only have 1 ledger yet", 1, r.getRangesList().size());
+ long firstLedger = r.getRangesList().get(0).getLedgerId();
+
+ stopHubServers();
+ startHubServers();
+
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(0xdeadbeef))).build());
+
+ r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+ assertEquals("Should have 2 ledgers after restart", 2, r.getRangesList().size());
+
+ for (int i = 100; i <= 200; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+ Thread.sleep(5000); // give GC a chance to happen
+
+ r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
+ long secondLedger = r.getRangesList().get(0).getLedgerId();
+
+ assertEquals("Should only have 1 ledger after GC", 1, r.getRangesList().size());
+
+ // ensure original ledger doesn't exist
+ String firstLedgerPath = String.format("/ledgers/L%010d", firstLedger);
+ String secondLedgerPath = String.format("/ledgers/L%010d", secondLedger);
+ assertNull("Ledger should not exist", bktb.getZooKeeperClient().exists(firstLedgerPath, false));
+ assertNotNull("Ledger should exist", bktb.getZooKeeperClient().exists(secondLedgerPath, false));
+
+ client.close();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1300510&r1=1300509&r2=1300510&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Wed Mar 14 11:29:03 2012
@@ -42,6 +42,18 @@ public class StubPersistenceManager impl
// noop
}
+ public void setMessageBound(ByteString topic, Integer bound) {
+ // noop
+ }
+
+ public void clearMessageBound(ByteString topic) {
+ // noop
+ }
+
+ public void consumeToBound(ByteString topic) {
+ // noop
+ }
+
protected static class ArrayListMessageFactory implements Factory<List<Message>> {
static ArrayListMessageFactory instance = new ArrayListMessageFactory();