You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 15:13:10 UTC
svn commit: r1402501 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
hedwig-protocol/src/main/java/org/apache/hedwig/protocol/
hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apache/...
Author: ivank
Date: Fri Oct 26 13:13:09 2012
New Revision: 1402501
URL: http://svn.apache.org/viewvc?rev=1402501&view=rev
Log:
BOOKKEEPER-439: No more messages delivered after deleted consumed ledgers. (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 13:13:09 2012
@@ -128,6 +128,8 @@ Trunk (unreleased changes)
BOOKKEEPER-191: Hub server should change ledger to write, so consumed messages have chance to be garbage collected. (sijie via ivank)
+ BOOKKEEPER-439: No more messages delivered after deleted consumed ledgers. (sijie via ivank)
+
IMPROVEMENTS:
bookkeeper-server:
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Fri Oct 26 13:13:09 2012
@@ -549,7 +549,7 @@ public class BookKeeper {
counter.block(0);
if (counter.getrc() != BKException.Code.OK) {
LOG.error("Error deleting ledger " + lId + " : " + counter.getrc());
- throw BKException.create(Code.ZKException);
+ throw BKException.create(counter.getrc());
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Fri Oct 26 13:13:09 2012
@@ -13544,6 +13544,10 @@ public final class PubSubProtocol {
boolean hasEndSeqIdIncluded();
org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId getEndSeqIdIncluded();
org.apache.hedwig.protocol.PubSubProtocol.MessageSeqIdOrBuilder getEndSeqIdIncludedOrBuilder();
+
+ // optional uint64 startSeqIdIncluded = 3;
+ boolean hasStartSeqIdIncluded();
+ long getStartSeqIdIncluded();
}
public static final class LedgerRange extends
com.google.protobuf.GeneratedMessage
@@ -13597,9 +13601,20 @@ public final class PubSubProtocol {
return endSeqIdIncluded_;
}
+ // optional uint64 startSeqIdIncluded = 3;
+ public static final int STARTSEQIDINCLUDED_FIELD_NUMBER = 3;
+ private long startSeqIdIncluded_;
+ public boolean hasStartSeqIdIncluded() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getStartSeqIdIncluded() {
+ return startSeqIdIncluded_;
+ }
+
private void initFields() {
ledgerId_ = 0L;
endSeqIdIncluded_ = org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId.getDefaultInstance();
+ startSeqIdIncluded_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -13629,6 +13644,9 @@ public final class PubSubProtocol {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, endSeqIdIncluded_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeUInt64(3, startSeqIdIncluded_);
+ }
getUnknownFields().writeTo(output);
}
@@ -13646,6 +13664,10 @@ public final class PubSubProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, endSeqIdIncluded_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(3, startSeqIdIncluded_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -13779,6 +13801,8 @@ public final class PubSubProtocol {
endSeqIdIncludedBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
+ startSeqIdIncluded_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -13829,6 +13853,10 @@ public final class PubSubProtocol {
} else {
result.endSeqIdIncluded_ = endSeqIdIncludedBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.startSeqIdIncluded_ = startSeqIdIncluded_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -13851,6 +13879,9 @@ public final class PubSubProtocol {
if (other.hasEndSeqIdIncluded()) {
mergeEndSeqIdIncluded(other.getEndSeqIdIncluded());
}
+ if (other.hasStartSeqIdIncluded()) {
+ setStartSeqIdIncluded(other.getStartSeqIdIncluded());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -13906,6 +13937,11 @@ public final class PubSubProtocol {
setEndSeqIdIncluded(subBuilder.buildPartial());
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ startSeqIdIncluded_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -14023,6 +14059,27 @@ public final class PubSubProtocol {
return endSeqIdIncludedBuilder_;
}
+ // optional uint64 startSeqIdIncluded = 3;
+ private long startSeqIdIncluded_ ;
+ public boolean hasStartSeqIdIncluded() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getStartSeqIdIncluded() {
+ return startSeqIdIncluded_;
+ }
+ public Builder setStartSeqIdIncluded(long value) {
+ bitField0_ |= 0x00000004;
+ startSeqIdIncluded_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearStartSeqIdIncluded() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ startSeqIdIncluded_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.LedgerRange)
}
@@ -16052,34 +16109,35 @@ public final class PubSubProtocol {
"eSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscrip" +
"tionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subscri" +
"ptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedwig" +
- ".SubscriptionPreferences\"O\n\013LedgerRange\022",
+ ".SubscriptionPreferences\"k\n\013LedgerRange\022",
"\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " +
- "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" +
- "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" +
- "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" +
- "nagerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hos" +
- "tname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadDat" +
- "a\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersion\022" +
- "\017\n\013VERSION_ONE\020\001*\207\001\n\rOperationType\022\013\n\007PU" +
- "BLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013U" +
- "NSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP",
- "_DELIVERY\020\005\022\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021Su" +
- "bscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBS" +
- "CRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusCode\022" +
- "\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\r" +
- "NO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSC" +
- "RIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021" +
- "COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n" +
- "\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE" +
- "_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALI" +
- "D_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n",
- "\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_P" +
- "ERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIP" +
- "TION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXIS" +
- "TS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC" +
- "_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_COND" +
- "ITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.h" +
- "edwig.protocolH\001"
+ "\001(\0132\024.Hedwig.MessageSeqId\022\032\n\022startSeqIdI" +
+ "ncluded\030\003 \001(\004\"3\n\014LedgerRanges\022#\n\006ranges\030" +
+ "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
+ "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion" +
+ "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
+ "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
+ "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
+ "NE\020\001*\207\001\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tS" +
+ "UBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003",
+ "\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005\022" +
+ "\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021SubscriptionEv" +
+ "ent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FOR" +
+ "CED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000" +
+ "\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPI" +
+ "C\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025" +
+ "CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CO" +
+ "NNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONS" +
+ "IBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017" +
+ "UNCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FI",
+ "LTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PE" +
+ "RSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_I" +
+ "NFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213" +
+ "\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_" +
+ "TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_" +
+ "EXISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\t" +
+ "COMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoc" +
+ "olH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16267,7 +16325,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_LedgerRange_descriptor,
- new java.lang.String[] { "LedgerId", "EndSeqIdIncluded", },
+ new java.lang.String[] { "LedgerId", "EndSeqIdIncluded", "StartSeqIdIncluded", },
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
internal_static_Hedwig_LedgerRanges_descriptor =
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Fri Oct 26 13:13:09 2012
@@ -286,6 +286,7 @@ message SubscriptionData {
message LedgerRange{
required uint64 ledgerId = 1;
optional MessageSeqId endSeqIdIncluded = 2;
+ optional uint64 startSeqIdIncluded = 3;
}
message LedgerRanges{
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Fri Oct 26 13:13:09 2012
@@ -352,6 +352,13 @@ public class HedwigAdmin {
return syncObj.value;
}
+ private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger, MessageSeqId endOfLedger) {
+ LedgerRange.Builder builder =
+ LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
+ .setEndSeqIdIncluded(endOfLedger);
+ return builder.build();
+ }
+
/**
* Return the ledger range forming the topic
*
@@ -387,49 +394,54 @@ public class HedwigAdmin {
if (null == ranges) {
return null;
}
+ List<LedgerRange> results = new ArrayList<LedgerRange>();
List<LedgerRange> lrs = ranges.getRangesList();
- if (lrs.isEmpty()) {
- return lrs;
- }
- // try to check last ledger (it may still open)
- LedgerRange lastRange = lrs.get(lrs.size() - 1);
- if (lastRange.hasEndSeqIdIncluded()) {
- return lrs;
- }
- // read last confirmed of the opened ledger
- try {
- List<LedgerRange> newLrs = new ArrayList<LedgerRange>();
- newLrs.addAll(lrs);
- lrs = newLrs;
- MessageSeqId lastSeqId;
- if (lrs.size() == 1) {
- lastSeqId = MessageSeqId.newBuilder().setLocalComponent(1).build();
- } else {
- lastSeqId = lrs.get(lrs.size() - 2).getEndSeqIdIncluded();
- }
- LedgerRange newLastRange = refreshLastLedgerRange(lastSeqId, lastRange);
- lrs.set(lrs.size() - 1, newLastRange);
- } catch (Exception e) {
- e.printStackTrace();
+ long startSeqId = 1L;
+ if (!lrs.isEmpty()) {
+ LedgerRange range = lrs.get(0);
+ if (!range.hasStartSeqIdIncluded() && range.hasEndSeqIdIncluded()) {
+ long ledgerId = range.getLedgerId();
+ try {
+ LedgerHandle lh = bk.openLedgerNoRecovery(ledgerId, DigestType.CRC32, passwd);
+ long numEntries = lh.readLastConfirmed() + 1;
+ long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+ startSeqId = endOfLedger - numEntries + 1;
+ } catch (BKException.BKNoSuchLedgerExistsException be) {
+ // ignore it
+ }
+ }
}
- return lrs;
- }
+ Iterator<LedgerRange> lrIter = lrs.iterator();
+ while (lrIter.hasNext()) {
+ LedgerRange range = lrIter.next();
+ if (range.hasEndSeqIdIncluded()) {
+ long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+ if (range.hasStartSeqIdIncluded()) {
+ startSeqId = range.getStartSeqIdIncluded();
+ } else {
+ range = buildLedgerRange(range.getLedgerId(), startSeqId, range.getEndSeqIdIncluded());
+ }
+ results.add(range);
+ if (startSeqId < endOfLedger + 1) {
+ startSeqId = endOfLedger + 1;
+ }
+ continue;
+ }
+ if (lrIter.hasNext()) {
+ throw new IllegalStateException("Ledger " + range.getLedgerId() + " for topic " + topic.toString()
+ + " is not the last one but still does not have an end seq-id");
+ }
- /**
- * Refresh last ledger range to get lastConfirmed entry, which make it available to read
- *
- * @param lastSeqId
- * Last sequence id of previous ledger
- * @param oldRange
- * Ledger range to set lastConfirmed entry
- */
- LedgerRange refreshLastLedgerRange(MessageSeqId lastSeqId, LedgerRange oldRange)
- throws BKException, KeeperException, InterruptedException {
- LedgerHandle lh = bk.openLedgerNoRecovery(oldRange.getLedgerId(), DigestType.CRC32, passwd);
- long lastConfirmed = lh.readLastConfirmed();
- MessageSeqId newSeqId = MessageSeqId.newBuilder().mergeFrom(lastSeqId)
- .setLocalComponent(lastSeqId.getLocalComponent() + lastConfirmed).build();
- return LedgerRange.newBuilder().mergeFrom(oldRange).setEndSeqIdIncluded(newSeqId).build();
+ if (range.hasStartSeqIdIncluded()) {
+ startSeqId = range.getStartSeqIdIncluded();
+ }
+
+ LedgerHandle lh = bk.openLedgerNoRecovery(range.getLedgerId(), DigestType.CRC32, passwd);
+ long endOfLedger = startSeqId + lh.readLastConfirmed();
+ MessageSeqId endSeqId = MessageSeqId.newBuilder().setLocalComponent(endOfLedger).build();
+ results.add(buildLedgerRange(range.getLedgerId(), startSeqId, endSeqId));
+ }
+ return results;
}
/**
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Fri Oct 26 13:13:09 2012
@@ -46,6 +46,7 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
@@ -638,27 +639,20 @@ public class HedwigConsole {
return true;
}
- private void printTopicLedgers(List<LedgerRange> lrs) {
+ private void printTopicLedgers(List<LedgerRange> ranges) {
System.out.println(">>> Persistence Info <<<");
- if (null == lrs) {
+ if (null == ranges) {
System.out.println("N/A");
return;
}
- if (lrs.isEmpty()) {
+ if (ranges.isEmpty()) {
System.out.println("No Ledger used.");
return;
}
- Iterator<LedgerRange> lrIterator = lrs.iterator();
- long startOfLedger = 1;
- while (lrIterator.hasNext()) {
- LedgerRange range = lrIterator.next();
- long endOfLedger = Long.MAX_VALUE;
- if (range.hasEndSeqIdIncluded()) {
- endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
- }
- System.out.println("Ledger " + range.getLedgerId() + " [ " + startOfLedger + " ~ " + (endOfLedger == Long.MAX_VALUE ? "" : endOfLedger) + " ]");
-
- startOfLedger = endOfLedger + 1;
+ for (LedgerRange range : ranges) {
+ System.out.println("Ledger " + range.getLedgerId() + " [ "
+ + range.getStartSeqIdIncluded() + " ~ "
+ + range.getEndSeqIdIncluded().getLocalComponent() + " ]");
}
System.out.println();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java Fri Oct 26 13:13:09 2012
@@ -21,12 +21,11 @@ package org.apache.hedwig.admin.console;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -71,17 +70,7 @@ public class ReadTopic {
static final int NUM_MESSAGES_TO_PRINT = 15;
- SortedMap<Long, InMemoryLedgerRange> ledgers = new TreeMap<Long, InMemoryLedgerRange>();
-
- static class InMemoryLedgerRange {
- LedgerRange range;
- long startSeqIdIncluded;
-
- public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
- this.range = range;
- this.startSeqIdIncluded = startSeqId;
- }
- }
+ List<LedgerRange> ledgers = new ArrayList<LedgerRange>();
/**
* Constructor
@@ -121,23 +110,7 @@ public class ReadTopic {
if (null == ranges || ranges.isEmpty()) {
return RC_NOLEDGERS;
}
- Iterator<LedgerRange> lrIterator = ranges.iterator();
- long startOfLedger = 1;
- while (lrIterator.hasNext()) {
- LedgerRange range = lrIterator.next();
- if (range.hasEndSeqIdIncluded()) {
- long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
- ledgers.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
- startOfLedger = endOfLedger + 1;
- continue;
- }
- if (lrIterator.hasNext()) {
- throw new IOException("Ledger-id: " + range.getLedgerId() + " for topic: " + topic
- + " is not the last one but still does not have an end seq-id");
- }
- // admin has read last confirmed entry of last ledger
- // so we don't need to handle here
- }
+ ledgers.addAll(ranges);
return RC_OK;
}
@@ -201,13 +174,13 @@ public class ReadTopic {
} else {
return rc;
}
-
- for (Map.Entry<Long, InMemoryLedgerRange> entry : ledgers.entrySet()) {
- long endSeqId = entry.getKey();
+
+ for (LedgerRange range : ledgers) {
+ long endSeqId = range.getEndSeqIdIncluded().getLocalComponent();
if (endSeqId < startSeqId) {
continue;
}
- boolean toContinue = readLedger(entry.getValue(), endSeqId);
+ boolean toContinue = readLedger(range);
startSeqId = endSeqId + 1;
if (!toContinue) {
break;
@@ -227,15 +200,16 @@ public class ReadTopic {
* @throws IOException
* @throws InterruptedException
*/
- protected boolean readLedger(InMemoryLedgerRange ledger, long endSeqId) throws BKException, IOException, InterruptedException {
- long tEndSeqId = endSeqId;
-
+ protected boolean readLedger(LedgerRange ledger)
+ throws BKException, IOException, InterruptedException {
+ long tEndSeqId = ledger.getEndSeqIdIncluded().getLocalComponent();
+
if (tEndSeqId < this.startSeqId) {
return true;
}
// Open Ledger Handle
- long ledgerId = ledger.range.getLedgerId();
- System.out.println("\n>>>>> Ledger " + ledgerId + " [ " + ledger.startSeqIdIncluded + " ~ " + (endSeqId == Long.MAX_VALUE ? "" : endSeqId) + "] <<<<<\n");
+ long ledgerId = ledger.getLedgerId();
+ System.out.println("\n>>>>> " + ledger + " <<<<<\n");
LedgerHandle lh = null;
try {
lh = admin.getBkHandle().openLedgerNoRecovery(ledgerId, admin.getBkDigestType(), admin.getBkPasswd());
@@ -245,7 +219,7 @@ public class ReadTopic {
if (null == lh) {
return true;
}
- long expectedEntryId = startSeqId - ledger.startSeqIdIncluded;
+ long expectedEntryId = startSeqId - ledger.getStartSeqIdIncluded();
long correctedEndSeqId = tEndSeqId;
try {
@@ -253,7 +227,9 @@ public class ReadTopic {
correctedEndSeqId = Math.min(startSeqId + NUM_MESSAGES_TO_PRINT - 1, tEndSeqId);
try {
- Enumeration<LedgerEntry> seq = lh.readEntries(startSeqId - ledger.startSeqIdIncluded, correctedEndSeqId - ledger.startSeqIdIncluded);
+ Enumeration<LedgerEntry> seq =
+ lh.readEntries(startSeqId - ledger.getStartSeqIdIncluded(),
+ correctedEndSeqId - ledger.getStartSeqIdIncluded());
LedgerEntry entry = null;
while (seq.hasMoreElements()) {
entry = seq.nextElement();
@@ -266,7 +242,7 @@ public class ReadTopic {
continue;
}
if (expectedEntryId != entry.getEntryId()
- || (message.getMsgId().getLocalComponent() - ledger.startSeqIdIncluded) != expectedEntryId) {
+ || (message.getMsgId().getLocalComponent() - ledger.getStartSeqIdIncluded()) != expectedEntryId) {
throw new IOException("ERROR: Message ids are out of order : expected entry id " + expectedEntryId
+ ", current entry id " + entry.getEntryId() + ", msg seq id " + message.getMsgId().getLocalComponent());
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Fri Oct 26 13:13:09 2012
@@ -22,6 +22,7 @@ import java.util.Enumeration;
import java.util.Iterator;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -30,6 +31,7 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -59,6 +61,7 @@ import org.apache.hedwig.server.topics.T
import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
+import static org.apache.hedwig.util.VarArgs.va;
/**
* This persistence manager uses zookeeper and bookkeeper to store messages.
@@ -80,26 +83,28 @@ public class BookkeeperPersistenceManage
private ServerConfiguration cfg;
private TopicManager tm;
+ private static final long START_SEQ_ID = 1L;
// max number of entries allowed in a ledger
private static final long UNLIMITED_ENTRIES = 0L;
private final long maxEntriesPerLedger;
static class InMemoryLedgerRange {
LedgerRange range;
- long startSeqIdIncluded; // included, for the very first ledger, this
- // value is 1
LedgerHandle handle;
- public InMemoryLedgerRange(LedgerRange range, long startSeqId, LedgerHandle handle) {
+ public InMemoryLedgerRange(LedgerRange range, LedgerHandle handle) {
this.range = range;
- this.startSeqIdIncluded = startSeqId;
this.handle = handle;
}
- public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
- this(range, startSeqId, null);
+ public InMemoryLedgerRange(LedgerRange range) {
+ this(range, null);
}
+ public long getStartSeqIdIncluded() {
+ assert range.hasStartSeqIdIncluded();
+ return range.getStartSeqIdIncluded();
+ }
}
static class TopicInfo {
@@ -184,6 +189,14 @@ public class BookkeeperPersistenceManage
tm.addTopicOwnershipChangeListener(this);
}
+ private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger,
+ MessageSeqId endOfLedger) {
+ LedgerRange.Builder builder =
+ LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
+ .setEndSeqIdIncluded(endOfLedger);
+ return builder.build();
+ }
+
class RangeScanOp extends TopicOpQueuer.SynchronousOp {
RangeScanRequest request;
int numMessagesRead = 0;
@@ -245,14 +258,14 @@ public class BookkeeperPersistenceManage
if (logger.isDebugEnabled()) {
logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
- + (startSeqId - imlr.startSeqIdIncluded) + " to entry-id: "
- + (correctedEndSeqId - imlr.startSeqIdIncluded));
+ + (startSeqId - imlr.getStartSeqIdIncluded()) + " to entry-id: "
+ + (correctedEndSeqId - imlr.getStartSeqIdIncluded()));
}
- imlr.handle.asyncReadEntries(startSeqId - imlr.startSeqIdIncluded, correctedEndSeqId
- - imlr.startSeqIdIncluded, new SafeAsynBKCallback.ReadCallback() {
+ imlr.handle.asyncReadEntries(startSeqId - imlr.getStartSeqIdIncluded(), correctedEndSeqId
+ - imlr.getStartSeqIdIncluded(), new SafeAsynBKCallback.ReadCallback() {
- long expectedEntryId = startSeqId - imlr.startSeqIdIncluded;
+ long expectedEntryId = startSeqId - imlr.getStartSeqIdIncluded();
@Override
public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
@@ -283,7 +296,7 @@ public class BookkeeperPersistenceManage
assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
+ ") != entry.getEntryId() (" + entry.getEntryId() + ")";
- assert (message.getMsgId().getLocalComponent() - imlr.startSeqIdIncluded) == expectedEntryId;
+ assert (message.getMsgId().getLocalComponent() - imlr.getStartSeqIdIncluded()) == expectedEntryId;
expectedEntryId++;
request.callback.messageScanned(ctx, message);
@@ -302,7 +315,7 @@ public class BookkeeperPersistenceManage
}
// continue scanning messages
- scanMessages(request, imlr.startSeqIdIncluded + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
+ scanMessages(request, imlr.getStartSeqIdIncluded() + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
}
}, request.ctx);
}
@@ -314,7 +327,7 @@ public class BookkeeperPersistenceManage
if (entry == null) {
// None of the old ledgers have this seq-id, we must use the
// current ledger
- long endSeqId = topicInfo.currentLedgerRange.startSeqIdIncluded
+ long endSeqId = topicInfo.currentLedgerRange.getStartSeqIdIncluded()
+ topicInfo.lastEntryIdAckedInCurrentLedger;
if (endSeqId < startSeqId) {
@@ -345,11 +358,12 @@ public class BookkeeperPersistenceManage
}
class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
- private long ledgerDeleted;
+ private Set<Long> ledgersDeleted;
- public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx, final long ledgerDeleted) {
+ public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx,
+ Set<Long> ledgersDeleted) {
queuer.super(topic, cb, ctx);
- this.ledgerDeleted = ledgerDeleted;
+ this.ledgersDeleted = ledgersDeleted;
}
@Override
@@ -357,22 +371,31 @@ public class BookkeeperPersistenceManage
final TopicInfo topicInfo = topicInfos.get(topic);
if (topicInfo == null) {
logger.error("Server is not responsible for topic!");
+ cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
return;
}
- boolean needsUpdate = false;
LedgerRanges.Builder builder = LedgerRanges.newBuilder();
final Set<Long> keysToRemove = new HashSet<Long>();
+ boolean foundUnconsumedLedger = false;
for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
- if (e.getValue().range.getLedgerId() == ledgerDeleted) {
- needsUpdate = true;
+ LedgerRange lr = e.getValue().range;
+ long ledgerId = lr.getLedgerId();
+ if (!foundUnconsumedLedger && ledgersDeleted.contains(ledgerId)) {
keysToRemove.add(e.getKey());
+ if (!lr.hasEndSeqIdIncluded()) {
+ String msg = "Should not remove unclosed ledger " + ledgerId + " for topic " + topic.toStringUtf8();
+ logger.error(msg);
+ cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
} else {
- builder.addRanges(e.getValue().range);
+ foundUnconsumedLedger = true;
+ builder.addRanges(lr);
}
}
builder.addRanges(topicInfo.currentLedgerRange.range);
- if (needsUpdate) {
+ if (!keysToRemove.isEmpty()) {
final LedgerRanges newRanges = builder.build();
tpManager.writeTopicPersistenceInfo(
topic, newRanges, topicInfo.ledgerRangesVersion, new Callback<Version>() {
@@ -410,35 +433,80 @@ public class BookkeeperPersistenceManage
return;
}
+ final LinkedList<Long> ledgersToDelete = new LinkedList<Long>();
for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
if (endSeqIdIncluded <= seqId) {
// This ledger's message entries have all been consumed already
// so it is safe to delete it from BookKeeper.
long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
- try {
- bk.deleteLedger(ledgerId);
- Callback<Void> cb = new Callback<Void>() {
- public void operationFinished(Object ctx, Void result) {
- // do nothing, op is async to stop other ops
- // occurring on the topic during the update
- }
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to update ledger znode", exception);
- }
- };
- queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgerId));
- } catch (Exception e) {
- // For now, just log an exception error message. In the
- // future, we can have more complicated retry logic to
- // delete a consumed ledger. The next time the ledger
- // garbage collection job runs, we'll once again try to
- // delete this ledger.
- logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
- }
- } else
+ ledgersToDelete.add(ledgerId);
+ } else {
break;
+ }
}
+
+ // no ledgers need to delete
+ if (ledgersToDelete.isEmpty()) {
+ return;
+ }
+
+ Set<Long> ledgersDeleted = new HashSet<Long>();
+ deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+ }
+ }
+
+ private void deleteLedgersAndUpdateLedgersRange(final ByteString topic,
+ final LinkedList<Long> ledgersToDelete,
+ final Set<Long> ledgersDeleted) {
+ if (ledgersToDelete.isEmpty()) {
+ Callback<Void> cb = new Callback<Void>() {
+ public void operationFinished(Object ctx, Void result) {
+ // do nothing, op is async to stop other ops
+ // occurring on the topic during the update
+ }
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
+ va(topic.toStringUtf8(), ledgersDeleted, exception.getMessage()));
+ }
+ };
+ queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
+ return;
+ }
+
+ final Long ledger = ledgersToDelete.poll();
+ if (null == ledger) {
+ deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+ return;
}
+
+ bk.asyncDeleteLedger(ledger, new DeleteCallback() {
+ @Override
+ public void deleteComplete(int rc, Object ctx) {
+ if (BKException.Code.NoSuchLedgerExistsException == rc ||
+ BKException.Code.OK == rc) {
+ ledgersDeleted.add(ledger);
+ deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+ return;
+ } else {
+ logger.warn("Exception while deleting consumed ledger {}, stop deleting other ledgers {} "
+ + "and update ledger ranges with deleted ledgers {} : {}",
+ va(ledger, ledgersToDelete, ledgersDeleted, BKException.create(rc)));
+ // We should not continue when failed to delete ledger
+ Callback<Void> cb = new Callback<Void>() {
+ public void operationFinished(Object ctx, Void result) {
+ // do nothing, op is async to stop other ops
+ // occurring on the topic during the update
+ }
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
+ va(topic, ledgersDeleted, exception.getMessage()));
+ }
+ };
+ queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
+ return;
+ }
+ }
+ }, null);
}
public void consumedUntil(ByteString topic, Long seqId) {
@@ -575,7 +643,7 @@ public class BookkeeperPersistenceManage
// check whether reach the threshold of a ledger, if it does,
// open a ledger to write
- long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded + 1;
+ long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded() + 1;
if (UNLIMITED_ENTRIES != maxEntriesPerLedger &&
entriesInThisLedger >= maxEntriesPerLedger) {
if (topicInfo.doChangeLedger.compareAndSet(false, true)) {
@@ -616,9 +684,9 @@ public class BookkeeperPersistenceManage
return;
}
- if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId) {
+ if (entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() != localSeqId) {
String msg = "Expected BK to assign entry-id: "
- + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
+ + (localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded())
+ " but it instead assigned entry-id: " + entryId + " topic: "
+ topic.toStringUtf8() + "ledger: " + lh.getId();
logger.error(msg);
@@ -630,7 +698,7 @@ public class BookkeeperPersistenceManage
// if this acked entry is the last entry of current ledger
// we can add a ChangeLedgerOp to execute to change ledger
if (topicInfo.doChangeLedger.get() &&
- entryId + topicInfo.currentLedgerRange.startSeqIdIncluded == topicInfo.lastSeqIdBeforeLedgerChange) {
+ entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() == topicInfo.lastSeqIdBeforeLedgerChange) {
// change ledger
changeLedger(topic, new Callback<Void>() {
@Override
@@ -708,19 +776,106 @@ public class BookkeeperPersistenceManage
}
void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
- Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
- TopicInfo topicInfo = new TopicInfo();
+ final List<LedgerRange> rangesList = ranges.getRangesList();
+ if (!rangesList.isEmpty()) {
+ LedgerRange range = rangesList.get(0);
+ if (range.hasStartSeqIdIncluded()) {
+ // we already have start seq id
+ processTopicLedgerRanges(rangesList, version, range.getStartSeqIdIncluded());
+ return;
+ }
+ getStartSeqIdToProcessTopicLedgerRanges(rangesList, version);
+ return;
+ }
+ // process topic ledger ranges directly
+ processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+ }
- long startOfLedger = 1;
+ /**
+ * Process old version ledger ranges to fetch start seq id.
+ */
+ void getStartSeqIdToProcessTopicLedgerRanges(
+ final List<LedgerRange> rangesList, final Version version) {
+
+ final LedgerRange range = rangesList.get(0);
+
+ if (!range.hasEndSeqIdIncluded()) {
+ // process topic ledger ranges directly
+ processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+ return;
+ }
+
+ final long ledgerId = range.getLedgerId();
+ // open the first ledger to compute right start seq id
+ bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd,
+ new SafeAsynBKCallback.OpenCallback() {
+
+ @Override
+ public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+
+ if (rc == BKException.Code.NoSuchLedgerExistsException) {
+ // process next ledger
+ processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+ return;
+ } else if (rc != BKException.Code.OK) {
+ BKException bke = BKException.create(rc);
+ logger.error("Could not open ledger {} to get start seq id while acquiring topic {} : {}",
+ va(ledgerId, topic.toStringUtf8(), bke));
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+ return;
+ }
+
+ final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
+
+ // the ledger is closed before, calling close is just a nop operation.
+ try {
+ ledgerHandle.close();
+ } catch (InterruptedException ie) {
+ // the exception would never be thrown for a read only ledger handle.
+ } catch (BKException bke) {
+ // the exception would never be thrown for a read only ledger handle.
+ }
+ if (numEntriesInLastLedger <= 0) {
+ String msg = "No entries found in a have-end-seq-id ledger " + ledgerId
+ + " when acquiring topic " + topic.toStringUtf8() + ".";
+ logger.error(msg);
+ cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+ long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+ long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
+
+ processTopicLedgerRanges(rangesList, version, startOfLedger);
+ }
+
+ }, ctx);
+ }
+
+ void processTopicLedgerRanges(final List<LedgerRange> rangesList, final Version version,
+ long startOfLedger) {
+ logger.info("Process {} ledgers for topic {} starting from seq id {}.",
+ va(rangesList.size(), topic.toStringUtf8(), startOfLedger));
+
+ Iterator<LedgerRange> lrIterator = rangesList.iterator();
+
+ TopicInfo topicInfo = new TopicInfo();
while (lrIterator.hasNext()) {
LedgerRange range = lrIterator.next();
if (range.hasEndSeqIdIncluded()) {
// this means it was a valid and completely closed ledger
long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
- topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
- startOfLedger = endOfLedger + 1;
+ if (range.hasStartSeqIdIncluded()) {
+ startOfLedger = range.getStartSeqIdIncluded();
+ } else {
+ range = buildLedgerRange(range.getLedgerId(), startOfLedger,
+ range.getEndSeqIdIncluded());
+ }
+ topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range));
+ if (startOfLedger < endOfLedger + 1) {
+ startOfLedger = endOfLedger + 1;
+ }
continue;
}
@@ -733,14 +888,19 @@ public class BookkeeperPersistenceManage
return;
}
+ if (range.hasStartSeqIdIncluded()) {
+ startOfLedger = range.getStartSeqIdIncluded();
+ }
+
// The last ledger does not have a valid seq-id, lets try to
// find it out
- recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
+ recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), startOfLedger,
+ version, topicInfo);
return;
}
// All ledgers were found properly closed, just start a new one
- openNewTopicLedger(topic, version, topicInfo, false, cb, ctx);
+ openNewTopicLedger(topic, version, topicInfo, startOfLedger, false, cb, ctx);
}
/**
@@ -749,9 +909,15 @@ public class BookkeeperPersistenceManage
*
* @param ledgerId
* Ledger to be recovered
+ * @param expectedStartSeqId
+ * Start seq id of the ledger to recover
+ * @param expectedVersionOfLedgerNode
+ * Expected version to update ledgers range
+ * @param topicInfo
+ * Topic info
*/
- private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final Version expectedVersionOfLedgerNode,
- final TopicInfo topicInfo) {
+ private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final long expectedStartSeqId,
+ final Version expectedVersionOfLedgerNode, final TopicInfo topicInfo) {
bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
@Override
@@ -772,7 +938,8 @@ public class BookkeeperPersistenceManage
// couldn't write to, so just ignore it
logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
closeLedger(ledgerHandle);
- openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, false, cb, ctx);
+ openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
+ expectedStartSeqId, false, cb, ctx);
return;
}
@@ -803,17 +970,25 @@ public class BookkeeperPersistenceManage
return;
}
- long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.ledgerRanges
- .lastKey();
- LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
- .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
- topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
- new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+ long endOfLedger = lastMessage.getMsgId().getLocalComponent();
+ long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
+
+ if (startOfLedger != expectedStartSeqId) {
+ // gap would be introduced by old version when gc consumed ledgers
+ String msg = "Expected start seq id of recovered ledger " + ledgerId
+ + " to be " + expectedStartSeqId + " but it was "
+ + startOfLedger + ".";
+ logger.warn(msg);
+ }
+
+ LedgerRange lr = buildLedgerRange(ledgerId, startOfLedger, lastMessage.getMsgId());
+ topicInfo.ledgerRanges.put(endOfLedger,
+ new InMemoryLedgerRange(lr, lh));
- logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
- + topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
+ logger.info("Recovered unclosed ledger: {} for topic: {} with {} entries starting from seq id {}",
+ va(ledgerId, topic.toStringUtf8(), numEntriesInLastLedger, startOfLedger));
- openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, false, cb, ctx);
+ openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, endOfLedger + 1, false, cb, ctx);
}
}, ctx);
@@ -832,6 +1007,8 @@ public class BookkeeperPersistenceManage
* Expected Version to Update Ledgers Node.
* @param topicInfo
* Topic Information
+ * @param startSeqId
+ * Start of sequence id for new ledger
* @param changeLedger
* Whether is it called when changing ledger
* @param cb
@@ -841,7 +1018,7 @@ public class BookkeeperPersistenceManage
*/
void openNewTopicLedger(final ByteString topic,
final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo,
- final boolean changeLedger,
+ final long startSeqId, final boolean changeLedger,
final Callback<Void> cb, final Object ctx) {
bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
new SafeAsynBKCallback.CreateCallback() {
@@ -864,13 +1041,13 @@ public class BookkeeperPersistenceManage
// compute last seq id
if (!changeLedger) {
topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
- .setLocalComponent(0).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
+ .setLocalComponent(startSeqId - 1).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
.getEndSeqIdIncluded();
}
- LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
- topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
- .getLocalComponent() + 1, lh);
+ LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId())
+ .setStartSeqIdIncluded(startSeqId).build();
+ topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, lh);
// Persist the fact that we started this new
// ledger to ZK
@@ -950,18 +1127,19 @@ public class BookkeeperPersistenceManage
cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
return;
}
+ long endSeqId = topicInfo.lastSeqIdPushed.getLocalComponent();
// update last range
- LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(ledgerId)
- .setEndSeqIdIncluded(topicInfo.lastSeqIdPushed).build();
+ LedgerRange lastRange =
+ buildLedgerRange(ledgerId, topicInfo.currentLedgerRange.getStartSeqIdIncluded(),
+ topicInfo.lastSeqIdPushed);
topicInfo.currentLedgerRange.range = lastRange;
// put current ledger to ledger ranges
- topicInfo.ledgerRanges.put(topicInfo.lastSeqIdPushed.getLocalComponent(),
- topicInfo.currentLedgerRange);
+ topicInfo.ledgerRanges.put(endSeqId, topicInfo.currentLedgerRange);
logger.info("Closed written ledger " + ledgerId + " for topic "
+ topic.toStringUtf8() + " to change ledger.");
openNewTopicLedger(topic, topicInfo.ledgerRangesVersion,
- topicInfo, true, cb, ctx);
+ topicInfo, endSeqId + 1, true, cb, ctx);
}
}, ctx);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Fri Oct 26 13:13:09 2012
@@ -17,9 +17,13 @@
*/
package org.apache.hedwig.server.persistence;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -28,23 +32,36 @@ import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
import org.apache.hedwig.HelperMethods;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.SubscriptionDataManager;
+import org.apache.hedwig.server.meta.TopicOwnershipManager;
+import org.apache.hedwig.server.meta.TopicPersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
import org.apache.hedwig.util.Callback;
+import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
+@RunWith(Parameterized.class)
public class TestBookKeeperPersistenceManager extends TestCase {
static Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
@@ -58,11 +75,114 @@ public class TestBookKeeperPersistenceMa
TopicManager tm;
BookkeeperPersistenceManager manager;
PubSubException failureException = null;
- MetadataManagerFactory metadataManagerFactory;
+ TestMetadataManagerFactory metadataManagerFactory;
+ TopicPersistenceManager tpManager;
+
+ boolean removeStartSeqId;
+
+ static class TestMetadataManagerFactory extends MetadataManagerFactory {
+
+ final MetadataManagerFactory factory;
+ int serviceDownCount = 0;
+
+ TestMetadataManagerFactory(ServerConfiguration conf, ZooKeeper zk) throws Exception {
+ factory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+ }
+
+ public void setServiceDownCount(int count) {
+ this.serviceDownCount = count;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return factory.getCurrentVersion();
+ }
+
+ @Override
+ protected MetadataManagerFactory initialize(
+ ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
+ // do nothing
+ return factory;
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ factory.shutdown();
+ }
+
+ @Override
+ public Iterator<ByteString> getTopics() throws IOException {
+ return factory.getTopics();
+ }
+
+ @Override
+ public TopicPersistenceManager newTopicPersistenceManager() {
+ final TopicPersistenceManager manager = factory.newTopicPersistenceManager();
+ return new TopicPersistenceManager() {
+
+ @Override
+ public void close() throws IOException {
+ manager.close();
+ }
+
+ @Override
+ public void readTopicPersistenceInfo(ByteString topic,
+ Callback<Versioned<LedgerRanges>> callback, Object ctx) {
+ if (serviceDownCount > 0) {
+ --serviceDownCount;
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+ return;
+ }
+ manager.readTopicPersistenceInfo(topic, callback, ctx);
+ }
+ @Override
+ public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
+ Callback<Version> callback, Object ctx) {
+ if (serviceDownCount > 0) {
+ --serviceDownCount;
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+ return;
+ }
+ manager.writeTopicPersistenceInfo(topic, ranges, version, callback, ctx);
+ }
+ @Override
+ public void deleteTopicPersistenceInfo(ByteString topic, Version version,
+ Callback<Void> callback, Object ctx) {
+ if (serviceDownCount > 0) {
+ --serviceDownCount;
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+ return;
+ }
+ manager.deleteTopicPersistenceInfo(topic, version, callback, ctx);
+ }
+ };
+ }
+
+ @Override
+ public SubscriptionDataManager newSubscriptionDataManager() {
+ return factory.newSubscriptionDataManager();
+ }
+
+ @Override
+ public TopicOwnershipManager newTopicOwnershipManager() {
+ return factory.newTopicOwnershipManager();
+ }
+ }
+
+ public TestBookKeeperPersistenceManager(boolean removeStartSeqId) {
+ this.removeStartSeqId = removeStartSeqId;
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] {
+ { true }, { false }
+ });
+ }
@Override
@Before
- protected void setUp() throws Exception {
+ public void setUp() throws Exception {
super.setUp();
// delay read response for 2s.
@@ -76,8 +196,8 @@ public class TestBookKeeperPersistenceMa
.setThrottleValue(3);
conf.addConf(bkClientConf);
- metadataManagerFactory =
- MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+ metadataManagerFactory = new TestMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+ tpManager = metadataManagerFactory.newTopicPersistenceManager();
scheduler = Executors.newScheduledThreadPool(1);
tm = new TrivialOwnAllTopicManager(conf, scheduler);
@@ -87,9 +207,10 @@ public class TestBookKeeperPersistenceMa
@Override
@After
- protected void tearDown() throws Exception {
+ public void tearDown() throws Exception {
tm.stop();
manager.stop();
+ tpManager.close();
metadataManagerFactory.shutdown();
scheduler.shutdown();
bktb.tearDown();
@@ -108,6 +229,7 @@ public class TestBookKeeperPersistenceMa
@Override
public void messageScanned(Object ctx, Message recvMessage) {
+ logger.info("Scanned message : {}", recvMessage.getMsgId().getLocalComponent());
if (null != nextScan && !runNextScan) {
runNextScan = true;
manager.scanMessages(nextScan);
@@ -133,7 +255,7 @@ public class TestBookKeeperPersistenceMa
public void scanFinished(Object ctx, ReasonForFinish reason) {
LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
try {
- statusQueue.put(true);
+ statusQueue.put(pubMsgs.isEmpty());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -150,6 +272,164 @@ public class TestBookKeeperPersistenceMa
}
@Test
+ public void testScanMessagesOnClosedLedgerAfterDeleteLedger() throws Exception {
+ scanMessagesAfterDeleteLedgerTest(2);
+ }
+
+ @Test
+ public void testScanMessagesOnUnclosedLedgerAfterDeleteLedger() throws Exception {
+ scanMessagesAfterDeleteLedgerTest(1);
+ }
+
+ private void scanMessagesAfterDeleteLedgerTest(int numLedgers) throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestScanMessagesAfterDeleteLedger");
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+
+ for (int i=0; i<numLedgers; i++) {
+ releaseTopic(topic);
+ // acquire topic again to force a new ledger
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+ }
+
+ consumedUntil(topic, 2L);
+ // Wait until ledger ranges is updated.
+ Thread.sleep(2000L);
+ releaseTopic(topic);
+
+ // acquire topic again
+ acquireTopic(topic);
+ // scan messages starting from 3
+ LinkedBlockingQueue<Boolean> statusQueue =
+ new LinkedBlockingQueue<Boolean>();
+ manager.scanMessages(new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
+ new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
+ Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+ assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+ }
+
+ @Test
+ public void testScanMessagesOnEmptyLedgerAfterDeleteLedger() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnEmptyLedgerAfterDeleteLedger");
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+ releaseTopic(topic);
+
+ // acquire topic again to force a new ledger
+ acquireTopic(topic);
+ logger.info("Consumed messages.");
+ consumedUntil(topic, 2L);
+ // Wait until ledger ranges is updated.
+ Thread.sleep(2000L);
+ logger.info("Released topic with an empty ledger.");
+ // release topic to force an empty ledger
+ releaseTopic(topic);
+
+ // publish 2 more messages, these message expected to be id 3 and 4
+ acquireTopic(topic);
+ logger.info("Published more messages.");
+ msgs.addAll(publishMessages(topic, 2));
+ releaseTopic(topic);
+
+ // acquire topic again
+ acquireTopic(topic);
+ // scan messages starting from 3
+ LinkedBlockingQueue<Boolean> statusQueue =
+ new LinkedBlockingQueue<Boolean>();
+ long startSeqId = removeStartSeqId ? 1 : 3;
+ manager.scanMessages(new RangeScanRequest(topic, startSeqId, 2, Long.MAX_VALUE,
+ new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
+ Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+ assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+ }
+
+ @Test
+ public void testFailedToDeleteLedger1() throws Exception {
+ failedToDeleteLedgersTest(1);
+ }
+
+ @Test
+ public void testFailedToDeleteLedger2() throws Exception {
+ // succeed to delete second ledger
+ failedToDeleteLedgersTest(2);
+ }
+
+ private void failedToDeleteLedgersTest(int numLedgers) throws Exception {
+ final ByteString topic = ByteString.copyFromUtf8("TestFailedToDeleteLedger");
+ final int serviceDownCount = 1;
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ for (int i=0; i<numLedgers; i++) {
+ acquireTopic(topic);
+ msgs.addAll(publishMessages(topic, 2));
+ releaseTopic(topic);
+ }
+
+ // acquire topic again to force a new ledger
+ acquireTopic(topic);
+ logger.info("Consumed messages.");
+ metadataManagerFactory.setServiceDownCount(serviceDownCount);
+ // failed consumed
+ consumedUntil(topic, 2L * numLedgers);
+ // Wait until ledger ranges is updated.
+ Thread.sleep(2000L);
+ logger.info("Released topic with an empty ledger.");
+ // release topic to force an empty ledger
+ releaseTopic(topic);
+
+ // publish 2 more messages, these message expected to be id 3 and 4
+ acquireTopic(topic);
+ logger.info("Published more messages.");
+ msgs.addAll(publishMessages(topic, 2));
+ releaseTopic(topic);
+
+ // acquire topic again
+ acquireTopic(topic);
+ LinkedBlockingQueue<Boolean> statusQueue =
+ new LinkedBlockingQueue<Boolean>();
+ manager.scanMessages(new RangeScanRequest(topic, numLedgers * 2 + 1, 2, Long.MAX_VALUE,
+ new RangeScanVerifier(subMessages(msgs, numLedgers * 2, numLedgers * 2 + 1), null), statusQueue));
+ Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+ assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+
+ // consumed
+ consumedUntil(topic, (numLedgers + 1) * 2L);
+ // Wait until ledger ranges is updated.
+ Thread.sleep(2000L);
+
+ Semaphore latch = new Semaphore(1);
+ latch.acquire();
+ tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
+ @Override
+ public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
+ if (null == ranges || ranges.getValue().getRangesList().size() > 1) {
+ failureException = new PubSubException.NoTopicPersistenceInfoException("Invalid persistence info found for topic " + topic.toStringUtf8());
+ ((Semaphore)ctx).release();
+ return;
+ }
+ failureException = null;
+ ((Semaphore)ctx).release();
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ failureException = exception;
+ ((Semaphore)ctx).release();
+ }
+ }, latch);
+ latch.acquire();
+ latch.release();
+ assertNull("Should not fail with exception.", failureException);
+ }
+
+ @Test
public void testScanMessagesOnTwoLedgers() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers");
@@ -245,8 +525,62 @@ public class TestBookKeeperPersistenceMa
}
}
- protected void releaseTopic(ByteString topic) throws Exception {
+ protected void releaseTopic(final ByteString topic) throws Exception {
manager.lostTopic(topic);
+ // backward testing ledger ranges without start seq id
+ if (removeStartSeqId) {
+ Semaphore latch = new Semaphore(1);
+ latch.acquire();
+ tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
+ @Override
+ public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
+ if (null == ranges) {
+ failureException = new PubSubException.NoTopicPersistenceInfoException("No persistence info found for topic " + topic.toStringUtf8());
+ ((Semaphore)ctx).release();
+ return;
+ }
+
+ // build a new ledger ranges w/o start seq id.
+ LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+ final List<LedgerRange> rangesList = ranges.getValue().getRangesList();
+ for (LedgerRange range : rangesList) {
+ LedgerRange.Builder newRangeBuilder = LedgerRange.newBuilder();
+ newRangeBuilder.setLedgerId(range.getLedgerId());
+ if (range.hasEndSeqIdIncluded()) {
+ newRangeBuilder.setEndSeqIdIncluded(range.getEndSeqIdIncluded());
+ }
+ builder.addRanges(newRangeBuilder.build());
+ }
+ tpManager.writeTopicPersistenceInfo(topic, builder.build(), ranges.getVersion(),
+ new Callback<Version>() {
+ @Override
+ public void operationFinished(Object ctx, Version newVersion) {
+ failureException = null;
+ ((Semaphore)ctx).release();
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ failureException = exception;
+ ((Semaphore)ctx).release();
+ }
+ }, ctx);
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ failureException = exception;
+ ((Semaphore)ctx).release();
+ }
+ }, latch);
+ latch.acquire();
+ latch.release();
+ if (null != failureException) {
+ throw failureException;
+ }
+ }
+ }
+
+ protected void consumedUntil(ByteString topic, long seqId) throws Exception {
+ manager.consumedUntil(topic, seqId);
}
}