You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 15:13:10 UTC

svn commit: r1402501 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apache/...

Author: ivank
Date: Fri Oct 26 13:13:09 2012
New Revision: 1402501

URL: http://svn.apache.org/viewvc?rev=1402501&view=rev
Log:
BOOKKEEPER-439: No more messages delivered after deleted consumed ledgers. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 13:13:09 2012
@@ -128,6 +128,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-191: Hub server should change ledger to write, so consumed messages have chance to be garbage collected. (sijie via ivank)
 
+        BOOKKEEPER-439: No more messages delivered after deleted consumed ledgers. (sijie via ivank)
+
     IMPROVEMENTS:
 
       bookkeeper-server:

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Fri Oct 26 13:13:09 2012
@@ -549,7 +549,7 @@ public class BookKeeper {
         counter.block(0);
         if (counter.getrc() != BKException.Code.OK) {
             LOG.error("Error deleting ledger " + lId + " : " + counter.getrc());
-            throw BKException.create(Code.ZKException);
+            throw BKException.create(counter.getrc());
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Fri Oct 26 13:13:09 2012
@@ -13544,6 +13544,10 @@ public final class PubSubProtocol {
     boolean hasEndSeqIdIncluded();
     org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId getEndSeqIdIncluded();
     org.apache.hedwig.protocol.PubSubProtocol.MessageSeqIdOrBuilder getEndSeqIdIncludedOrBuilder();
+    
+    // optional uint64 startSeqIdIncluded = 3;
+    boolean hasStartSeqIdIncluded();
+    long getStartSeqIdIncluded();
   }
   public static final class LedgerRange extends
       com.google.protobuf.GeneratedMessage
@@ -13597,9 +13601,20 @@ public final class PubSubProtocol {
       return endSeqIdIncluded_;
     }
     
+    // optional uint64 startSeqIdIncluded = 3;
+    public static final int STARTSEQIDINCLUDED_FIELD_NUMBER = 3;
+    private long startSeqIdIncluded_;
+    public boolean hasStartSeqIdIncluded() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public long getStartSeqIdIncluded() {
+      return startSeqIdIncluded_;
+    }
+    
     private void initFields() {
       ledgerId_ = 0L;
       endSeqIdIncluded_ = org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId.getDefaultInstance();
+      startSeqIdIncluded_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -13629,6 +13644,9 @@ public final class PubSubProtocol {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeMessage(2, endSeqIdIncluded_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt64(3, startSeqIdIncluded_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -13646,6 +13664,10 @@ public final class PubSubProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(2, endSeqIdIncluded_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(3, startSeqIdIncluded_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -13779,6 +13801,8 @@ public final class PubSubProtocol {
           endSeqIdIncludedBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000002);
+        startSeqIdIncluded_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -13829,6 +13853,10 @@ public final class PubSubProtocol {
         } else {
           result.endSeqIdIncluded_ = endSeqIdIncludedBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.startSeqIdIncluded_ = startSeqIdIncluded_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -13851,6 +13879,9 @@ public final class PubSubProtocol {
         if (other.hasEndSeqIdIncluded()) {
           mergeEndSeqIdIncluded(other.getEndSeqIdIncluded());
         }
+        if (other.hasStartSeqIdIncluded()) {
+          setStartSeqIdIncluded(other.getStartSeqIdIncluded());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -13906,6 +13937,11 @@ public final class PubSubProtocol {
               setEndSeqIdIncluded(subBuilder.buildPartial());
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              startSeqIdIncluded_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -14023,6 +14059,27 @@ public final class PubSubProtocol {
         return endSeqIdIncludedBuilder_;
       }
       
+      // optional uint64 startSeqIdIncluded = 3;
+      private long startSeqIdIncluded_ ;
+      public boolean hasStartSeqIdIncluded() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public long getStartSeqIdIncluded() {
+        return startSeqIdIncluded_;
+      }
+      public Builder setStartSeqIdIncluded(long value) {
+        bitField0_ |= 0x00000004;
+        startSeqIdIncluded_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearStartSeqIdIncluded() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        startSeqIdIncluded_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.LedgerRange)
     }
     
@@ -16052,34 +16109,35 @@ public final class PubSubProtocol {
       "eSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscrip" +
       "tionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subscri" +
       "ptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedwig" +
-      ".SubscriptionPreferences\"O\n\013LedgerRange\022",
+      ".SubscriptionPreferences\"k\n\013LedgerRange\022",
       "\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " +
-      "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" +
-      "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" +
-      "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" +
-      "nagerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hos" +
-      "tname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadDat" +
-      "a\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersion\022" +
-      "\017\n\013VERSION_ONE\020\001*\207\001\n\rOperationType\022\013\n\007PU" +
-      "BLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013U" +
-      "NSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP",
-      "_DELIVERY\020\005\022\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021Su" +
-      "bscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBS" +
-      "CRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusCode\022" +
-      "\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\r" +
-      "NO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSC" +
-      "RIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021" +
-      "COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n" +
-      "\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE" +
-      "_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALI" +
-      "D_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n",
-      "\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_P" +
-      "ERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIP" +
-      "TION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXIS" +
-      "TS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC" +
-      "_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_COND" +
-      "ITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.h" +
-      "edwig.protocolH\001"
+      "\001(\0132\024.Hedwig.MessageSeqId\022\032\n\022startSeqIdI" +
+      "ncluded\030\003 \001(\004\"3\n\014LedgerRanges\022#\n\006ranges\030" +
+      "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
+      "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion" +
+      "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
+      "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
+      "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
+      "NE\020\001*\207\001\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tS" +
+      "UBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003",
+      "\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005\022" +
+      "\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021SubscriptionEv" +
+      "ent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FOR" +
+      "CED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000" +
+      "\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPI" +
+      "C\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025" +
+      "CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CO" +
+      "NNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONS" +
+      "IBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017" +
+      "UNCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FI",
+      "LTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PE" +
+      "RSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_I" +
+      "NFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213" +
+      "\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_" +
+      "TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_" +
+      "EXISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\t" +
+      "COMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoc" +
+      "olH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16267,7 +16325,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_LedgerRange_descriptor,
-              new java.lang.String[] { "LedgerId", "EndSeqIdIncluded", },
+              new java.lang.String[] { "LedgerId", "EndSeqIdIncluded", "StartSeqIdIncluded", },
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
           internal_static_Hedwig_LedgerRanges_descriptor =

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Fri Oct 26 13:13:09 2012
@@ -286,6 +286,7 @@ message SubscriptionData {
 message LedgerRange{
     required uint64 ledgerId = 1;
     optional MessageSeqId endSeqIdIncluded = 2;
+    optional uint64 startSeqIdIncluded = 3;
 }
 
 message LedgerRanges{

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Fri Oct 26 13:13:09 2012
@@ -352,6 +352,13 @@ public class HedwigAdmin {
         return syncObj.value;
     }
 
+    private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger, MessageSeqId endOfLedger) {
+        LedgerRange.Builder builder =
+            LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
+                       .setEndSeqIdIncluded(endOfLedger);
+        return builder.build();
+    }
+
     /**
      * Return the ledger range forming the topic
      *
@@ -387,49 +394,54 @@ public class HedwigAdmin {
         if (null == ranges) {
             return null;
         }
+        List<LedgerRange> results = new ArrayList<LedgerRange>();
         List<LedgerRange> lrs = ranges.getRangesList();
-        if (lrs.isEmpty()) {
-            return lrs;
-        }
-        // try to check last ledger (it may still open)
-        LedgerRange lastRange = lrs.get(lrs.size() - 1);
-        if (lastRange.hasEndSeqIdIncluded()) {
-            return lrs;
-        }
-        // read last confirmed of the opened ledger
-        try {
-            List<LedgerRange> newLrs = new ArrayList<LedgerRange>();
-            newLrs.addAll(lrs);
-            lrs = newLrs;
-            MessageSeqId lastSeqId;
-            if (lrs.size() == 1) {
-                lastSeqId = MessageSeqId.newBuilder().setLocalComponent(1).build();
-            } else {
-                lastSeqId = lrs.get(lrs.size() - 2).getEndSeqIdIncluded();
-            }
-            LedgerRange newLastRange = refreshLastLedgerRange(lastSeqId, lastRange);
-            lrs.set(lrs.size() - 1, newLastRange);
-        } catch (Exception e) {
-            e.printStackTrace();
+        long startSeqId = 1L;
+        if (!lrs.isEmpty()) {
+            LedgerRange range = lrs.get(0);
+            if (!range.hasStartSeqIdIncluded() && range.hasEndSeqIdIncluded()) {
+                long ledgerId = range.getLedgerId();
+                try {
+                    LedgerHandle lh = bk.openLedgerNoRecovery(ledgerId, DigestType.CRC32, passwd);
+                    long numEntries = lh.readLastConfirmed() + 1;
+                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+                    startSeqId = endOfLedger - numEntries + 1;
+                } catch (BKException.BKNoSuchLedgerExistsException be) {
+                    // ignore it
+                }
+            }
         }
-        return lrs;
-    }
+        Iterator<LedgerRange> lrIter = lrs.iterator();
+        while (lrIter.hasNext()) {
+            LedgerRange range = lrIter.next();
+            if (range.hasEndSeqIdIncluded()) {
+                long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+                if (range.hasStartSeqIdIncluded()) {
+                    startSeqId = range.getStartSeqIdIncluded();
+                } else {
+                    range = buildLedgerRange(range.getLedgerId(), startSeqId, range.getEndSeqIdIncluded());
+                }
+                results.add(range);
+                if (startSeqId < endOfLedger + 1) {
+                    startSeqId = endOfLedger + 1;
+                }
+                continue;
+            }
+            if (lrIter.hasNext()) {
+                throw new IllegalStateException("Ledger " + range.getLedgerId() + " for topic " + topic.toString()
+                                                + " is not the last one but still does not have an end seq-id");
+            }
 
-    /**
-     * Refresh last ledger range to get lastConfirmed entry, which make it available to read
-     *
-     * @param lastSeqId
-     *            Last sequence id of previous ledger
-     * @param oldRange
-     *            Ledger range to set lastConfirmed entry
-     */
-    LedgerRange refreshLastLedgerRange(MessageSeqId lastSeqId, LedgerRange oldRange)
-        throws BKException, KeeperException, InterruptedException {
-        LedgerHandle lh = bk.openLedgerNoRecovery(oldRange.getLedgerId(), DigestType.CRC32, passwd);
-        long lastConfirmed = lh.readLastConfirmed();
-        MessageSeqId newSeqId = MessageSeqId.newBuilder().mergeFrom(lastSeqId)
-                                .setLocalComponent(lastSeqId.getLocalComponent() + lastConfirmed).build();
-        return LedgerRange.newBuilder().mergeFrom(oldRange).setEndSeqIdIncluded(newSeqId).build();
+            if (range.hasStartSeqIdIncluded()) {
+                startSeqId = range.getStartSeqIdIncluded();
+            }
+
+            LedgerHandle lh = bk.openLedgerNoRecovery(range.getLedgerId(), DigestType.CRC32, passwd);
+            long endOfLedger = startSeqId + lh.readLastConfirmed();
+            MessageSeqId endSeqId = MessageSeqId.newBuilder().setLocalComponent(endOfLedger).build();
+            results.add(buildLedgerRange(range.getLedgerId(), startSeqId, endSeqId));
+        }
+        return results;
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Fri Oct 26 13:13:09 2012
@@ -46,6 +46,7 @@ import org.apache.hedwig.client.api.Subs
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
@@ -638,27 +639,20 @@ public class HedwigConsole {
             return true;
         }
 
-        private void printTopicLedgers(List<LedgerRange> lrs) {
+        private void printTopicLedgers(List<LedgerRange> ranges) {
             System.out.println(">>> Persistence Info <<<");
-            if (null == lrs) {
+            if (null == ranges) {
                 System.out.println("N/A");
                 return;
             }
-            if (lrs.isEmpty()) {
+            if (ranges.isEmpty()) {
                 System.out.println("No Ledger used.");
                 return;
             }
-            Iterator<LedgerRange> lrIterator = lrs.iterator();
-            long startOfLedger = 1;
-            while (lrIterator.hasNext()) {
-                LedgerRange range = lrIterator.next();
-                long endOfLedger = Long.MAX_VALUE;
-                if (range.hasEndSeqIdIncluded()) {
-                    endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                }
-                System.out.println("Ledger " + range.getLedgerId() + " [ " + startOfLedger + " ~ " + (endOfLedger == Long.MAX_VALUE ? "" : endOfLedger) + " ]");
-
-                startOfLedger = endOfLedger + 1;
+            for (LedgerRange range : ranges) {
+                System.out.println("Ledger " + range.getLedgerId() + " [ "
+                                   + range.getStartSeqIdIncluded() + " ~ "
+                                   + range.getEndSeqIdIncluded().getLocalComponent() + " ]");
             }
             System.out.println();
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java Fri Oct 26 13:13:09 2012
@@ -21,12 +21,11 @@ package org.apache.hedwig.admin.console;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -71,17 +70,7 @@ public class ReadTopic {
     
     static final int NUM_MESSAGES_TO_PRINT = 15;
 
-    SortedMap<Long, InMemoryLedgerRange> ledgers = new TreeMap<Long, InMemoryLedgerRange>();
-    
-    static class InMemoryLedgerRange {
-        LedgerRange range;
-        long startSeqIdIncluded;
-        
-        public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
-            this.range = range;
-            this.startSeqIdIncluded = startSeqId;
-        }
-    }
+    List<LedgerRange> ledgers = new ArrayList<LedgerRange>();
     
     /**
      * Constructor
@@ -121,23 +110,7 @@ public class ReadTopic {
         if (null == ranges || ranges.isEmpty()) {
             return RC_NOLEDGERS;
         }
-        Iterator<LedgerRange> lrIterator = ranges.iterator();
-        long startOfLedger = 1;
-        while (lrIterator.hasNext()) {
-            LedgerRange range = lrIterator.next();
-            if (range.hasEndSeqIdIncluded()) {
-                long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                ledgers.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
-                startOfLedger = endOfLedger + 1;
-                continue;
-            }
-            if (lrIterator.hasNext()) {
-                throw new IOException("Ledger-id: " + range.getLedgerId() + " for topic: " + topic
-                        + " is not the last one but still does not have an end seq-id");
-            }
-            // admin has read last confirmed entry of last ledger
-            // so we don't need to handle here
-        }
+        ledgers.addAll(ranges);
         return RC_OK;
     }
     
@@ -201,13 +174,13 @@ public class ReadTopic {
         } else {
             return rc;
         }
-        
-        for (Map.Entry<Long, InMemoryLedgerRange> entry : ledgers.entrySet()) {
-            long endSeqId = entry.getKey();
+
+        for (LedgerRange range : ledgers) {
+            long endSeqId = range.getEndSeqIdIncluded().getLocalComponent();
             if (endSeqId < startSeqId) {
                 continue;
             }
-            boolean toContinue = readLedger(entry.getValue(), endSeqId);
+            boolean toContinue = readLedger(range);
             startSeqId = endSeqId + 1;
             if (!toContinue) {
                 break;
@@ -227,15 +200,16 @@ public class ReadTopic {
      * @throws IOException
      * @throws InterruptedException
      */
-    protected boolean readLedger(InMemoryLedgerRange ledger, long endSeqId) throws BKException, IOException, InterruptedException {
-        long tEndSeqId = endSeqId;
-        
+    protected boolean readLedger(LedgerRange ledger)
+    throws BKException, IOException, InterruptedException {
+        long tEndSeqId = ledger.getEndSeqIdIncluded().getLocalComponent();
+
         if (tEndSeqId < this.startSeqId) {
             return true;
         }
         // Open Ledger Handle
-        long ledgerId = ledger.range.getLedgerId();
-        System.out.println("\n>>>>> Ledger " + ledgerId + " [ " + ledger.startSeqIdIncluded + " ~ " + (endSeqId == Long.MAX_VALUE ? "" : endSeqId) + "] <<<<<\n");
+        long ledgerId = ledger.getLedgerId();
+        System.out.println("\n>>>>> " + ledger + " <<<<<\n");
         LedgerHandle lh = null;
         try {
             lh = admin.getBkHandle().openLedgerNoRecovery(ledgerId, admin.getBkDigestType(), admin.getBkPasswd());
@@ -245,7 +219,7 @@ public class ReadTopic {
         if (null == lh) {
             return true;
         }
-        long expectedEntryId = startSeqId - ledger.startSeqIdIncluded;
+        long expectedEntryId = startSeqId - ledger.getStartSeqIdIncluded();
         
         long correctedEndSeqId = tEndSeqId;
         try {
@@ -253,7 +227,9 @@ public class ReadTopic {
                 correctedEndSeqId = Math.min(startSeqId + NUM_MESSAGES_TO_PRINT - 1, tEndSeqId);
                 
                 try {
-                    Enumeration<LedgerEntry> seq = lh.readEntries(startSeqId - ledger.startSeqIdIncluded, correctedEndSeqId - ledger.startSeqIdIncluded);
+                    Enumeration<LedgerEntry> seq =
+                        lh.readEntries(startSeqId - ledger.getStartSeqIdIncluded(),
+                                       correctedEndSeqId - ledger.getStartSeqIdIncluded());
                     LedgerEntry entry = null;
                     while (seq.hasMoreElements()) {
                         entry = seq.nextElement();
@@ -266,7 +242,7 @@ public class ReadTopic {
                             continue;
                         }
                         if (expectedEntryId != entry.getEntryId()
-                            || (message.getMsgId().getLocalComponent() - ledger.startSeqIdIncluded) != expectedEntryId) {
+                            || (message.getMsgId().getLocalComponent() - ledger.getStartSeqIdIncluded()) != expectedEntryId) {
                             throw new IOException("ERROR: Message ids are out of order : expected entry id " + expectedEntryId
                                                 + ", current entry id " + entry.getEntryId() + ", msg seq id " + message.getMsgId().getLocalComponent());
                         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Fri Oct 26 13:13:09 2012
@@ -22,6 +22,7 @@ import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -30,6 +31,7 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -59,6 +61,7 @@ import org.apache.hedwig.server.topics.T
 import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
+import static org.apache.hedwig.util.VarArgs.va;
 
 /**
  * This persistence manager uses zookeeper and bookkeeper to store messages.
@@ -80,26 +83,28 @@ public class BookkeeperPersistenceManage
     private ServerConfiguration cfg;
     private TopicManager tm;
 
+    private static final long START_SEQ_ID = 1L;
     // max number of entries allowed in a ledger
     private static final long UNLIMITED_ENTRIES = 0L;
     private final long maxEntriesPerLedger;
 
     static class InMemoryLedgerRange {
         LedgerRange range;
-        long startSeqIdIncluded; // included, for the very first ledger, this
-        // value is 1
         LedgerHandle handle;
 
-        public InMemoryLedgerRange(LedgerRange range, long startSeqId, LedgerHandle handle) {
+        public InMemoryLedgerRange(LedgerRange range, LedgerHandle handle) {
             this.range = range;
-            this.startSeqIdIncluded = startSeqId;
             this.handle = handle;
         }
 
-        public InMemoryLedgerRange(LedgerRange range, long startSeqId) {
-            this(range, startSeqId, null);
+        public InMemoryLedgerRange(LedgerRange range) {
+            this(range, null);
         }
 
+        public long getStartSeqIdIncluded() {
+            assert range.hasStartSeqIdIncluded();
+            return range.getStartSeqIdIncluded();
+        }
     }
 
     static class TopicInfo {
@@ -184,6 +189,14 @@ public class BookkeeperPersistenceManage
         tm.addTopicOwnershipChangeListener(this);
     }
 
+    private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger,
+                                                MessageSeqId endOfLedger) {
+        LedgerRange.Builder builder =
+            LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
+                       .setEndSeqIdIncluded(endOfLedger);
+        return builder.build();
+    }
+
     class RangeScanOp extends TopicOpQueuer.SynchronousOp {
         RangeScanRequest request;
         int numMessagesRead = 0;
@@ -245,14 +258,14 @@ public class BookkeeperPersistenceManage
 
             if (logger.isDebugEnabled()) {
                 logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
-                             + (startSeqId - imlr.startSeqIdIncluded) + " to entry-id: "
-                             + (correctedEndSeqId - imlr.startSeqIdIncluded));
+                             + (startSeqId - imlr.getStartSeqIdIncluded()) + " to entry-id: "
+                             + (correctedEndSeqId - imlr.getStartSeqIdIncluded()));
             }
 
-            imlr.handle.asyncReadEntries(startSeqId - imlr.startSeqIdIncluded, correctedEndSeqId
-            - imlr.startSeqIdIncluded, new SafeAsynBKCallback.ReadCallback() {
+            imlr.handle.asyncReadEntries(startSeqId - imlr.getStartSeqIdIncluded(), correctedEndSeqId
+            - imlr.getStartSeqIdIncluded(), new SafeAsynBKCallback.ReadCallback() {
 
-                long expectedEntryId = startSeqId - imlr.startSeqIdIncluded;
+                long expectedEntryId = startSeqId - imlr.getStartSeqIdIncluded();
 
                 @Override
                 public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
@@ -283,7 +296,7 @@ public class BookkeeperPersistenceManage
 
                         assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
                         + ") != entry.getEntryId() (" + entry.getEntryId() + ")";
-                        assert (message.getMsgId().getLocalComponent() - imlr.startSeqIdIncluded) == expectedEntryId;
+                        assert (message.getMsgId().getLocalComponent() - imlr.getStartSeqIdIncluded()) == expectedEntryId;
 
                         expectedEntryId++;
                         request.callback.messageScanned(ctx, message);
@@ -302,7 +315,7 @@ public class BookkeeperPersistenceManage
                     }
 
                     // continue scanning messages
-                    scanMessages(request, imlr.startSeqIdIncluded + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
+                    scanMessages(request, imlr.getStartSeqIdIncluded() + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
                 }
             }, request.ctx);
         }
@@ -314,7 +327,7 @@ public class BookkeeperPersistenceManage
             if (entry == null) {
                 // None of the old ledgers have this seq-id, we must use the
                 // current ledger
-                long endSeqId = topicInfo.currentLedgerRange.startSeqIdIncluded
+                long endSeqId = topicInfo.currentLedgerRange.getStartSeqIdIncluded()
                                 + topicInfo.lastEntryIdAckedInCurrentLedger;
 
                 if (endSeqId < startSeqId) {
@@ -345,11 +358,12 @@ public class BookkeeperPersistenceManage
     }
 
     class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
-        private long ledgerDeleted;
+        private Set<Long> ledgersDeleted;
 
-        public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx, final long ledgerDeleted) {
+        public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx,
+                              Set<Long> ledgersDeleted) {
             queuer.super(topic, cb, ctx);
-            this.ledgerDeleted = ledgerDeleted;
+            this.ledgersDeleted = ledgersDeleted;
         }
 
         @Override
@@ -357,22 +371,31 @@ public class BookkeeperPersistenceManage
             final TopicInfo topicInfo = topicInfos.get(topic);
             if (topicInfo == null) {
                 logger.error("Server is not responsible for topic!");
+                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
                 return;
             }
-            boolean needsUpdate = false;
             LedgerRanges.Builder builder = LedgerRanges.newBuilder();
             final Set<Long> keysToRemove = new HashSet<Long>();
+            boolean foundUnconsumedLedger = false;
             for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
-                if (e.getValue().range.getLedgerId() == ledgerDeleted) {
-                    needsUpdate = true;
+                LedgerRange lr = e.getValue().range;
+                long ledgerId = lr.getLedgerId();
+                if (!foundUnconsumedLedger && ledgersDeleted.contains(ledgerId)) {
                     keysToRemove.add(e.getKey());
+                    if (!lr.hasEndSeqIdIncluded()) {
+                        String msg = "Should not remove unclosed ledger " + ledgerId + " for topic " + topic.toStringUtf8();
+                        logger.error(msg);
+                        cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                        return;
+                    }
                 } else {
-                    builder.addRanges(e.getValue().range);
+                    foundUnconsumedLedger = true;
+                    builder.addRanges(lr);
                 }
             }
             builder.addRanges(topicInfo.currentLedgerRange.range);
 
-            if (needsUpdate) {
+            if (!keysToRemove.isEmpty()) {
                 final LedgerRanges newRanges = builder.build();
                 tpManager.writeTopicPersistenceInfo(
                 topic, newRanges, topicInfo.ledgerRangesVersion, new Callback<Version>() {
@@ -410,35 +433,80 @@ public class BookkeeperPersistenceManage
                 return;
             }
 
+            final LinkedList<Long> ledgersToDelete = new LinkedList<Long>();
             for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
                 if (endSeqIdIncluded <= seqId) {
                     // This ledger's message entries have all been consumed already
                     // so it is safe to delete it from BookKeeper.
                     long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
-                    try {
-                        bk.deleteLedger(ledgerId);
-                        Callback<Void> cb = new Callback<Void>() {
-                            public void operationFinished(Object ctx, Void result) {
-                                // do nothing, op is async to stop other ops
-                                // occurring on the topic during the update
-                            }
-                            public void operationFailed(Object ctx, PubSubException exception) {
-                                logger.error("Failed to update ledger znode", exception);
-                            }
-                        };
-                        queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgerId));
-                    } catch (Exception e) {
-                        // For now, just log an exception error message. In the
-                        // future, we can have more complicated retry logic to
-                        // delete a consumed ledger. The next time the ledger
-                        // garbage collection job runs, we'll once again try to
-                        // delete this ledger.
-                        logger.error("Exception while deleting consumed ledgerId: " + ledgerId, e);
-                    }
-                } else
+                    ledgersToDelete.add(ledgerId);
+                } else {
                     break;
+                }
             }
+
+            // no ledgers need to delete
+            if (ledgersToDelete.isEmpty()) {
+                return;
+            }
+
+            Set<Long> ledgersDeleted = new HashSet<Long>();
+            deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+        }
+    }
+
+    private void deleteLedgersAndUpdateLedgersRange(final ByteString topic,
+                                                    final LinkedList<Long> ledgersToDelete,
+                                                    final Set<Long> ledgersDeleted) {
+        if (ledgersToDelete.isEmpty()) {
+            Callback<Void> cb = new Callback<Void>() {
+                public void operationFinished(Object ctx, Void result) {
+                    // do nothing, op is async to stop other ops
+                    // occurring on the topic during the update
+                }
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
+                                 va(topic.toStringUtf8(), ledgersDeleted, exception.getMessage()));
+                }
+            };
+            queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
+            return;
+        }
+
+        final Long ledger = ledgersToDelete.poll();
+        if (null == ledger) {
+            deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+            return;
         }
+
+        bk.asyncDeleteLedger(ledger, new DeleteCallback() {
+            @Override
+            public void deleteComplete(int rc, Object ctx) {
+                if (BKException.Code.NoSuchLedgerExistsException == rc ||
+                    BKException.Code.OK == rc) {
+                    ledgersDeleted.add(ledger);
+                    deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
+                    return;
+                } else {
+                    logger.warn("Exception while deleting consumed ledger {}, stop deleting other ledgers {} "
+                                + "and update ledger ranges with deleted ledgers {} : {}",
+                                va(ledger, ledgersToDelete, ledgersDeleted, BKException.create(rc)));
+                    // We should not continue when failed to delete ledger
+                    Callback<Void> cb = new Callback<Void>() {
+                        public void operationFinished(Object ctx, Void result) {
+                            // do nothing, op is async to stop other ops
+                            // occurring on the topic during the update
+                        }
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
+                                         va(topic, ledgersDeleted, exception.getMessage()));
+                        }
+                    };
+                    queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
+                    return;
+                }
+            }
+        }, null);
     }
 
     public void consumedUntil(ByteString topic, Long seqId) {
@@ -575,7 +643,7 @@ public class BookkeeperPersistenceManage
 
         // check whether reach the threshold of a ledger, if it does,
         // open a ledger to write
-        long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded + 1;
+        long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded() + 1;
         if (UNLIMITED_ENTRIES != maxEntriesPerLedger &&
             entriesInThisLedger >= maxEntriesPerLedger) {
             if (topicInfo.doChangeLedger.compareAndSet(false, true)) {
@@ -616,9 +684,9 @@ public class BookkeeperPersistenceManage
                     return;
                 }
 
-                if (entryId + topicInfo.currentLedgerRange.startSeqIdIncluded != localSeqId) {
+                if (entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() != localSeqId) {
                     String msg = "Expected BK to assign entry-id: "
-                                 + (localSeqId - topicInfo.currentLedgerRange.startSeqIdIncluded)
+                                 + (localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded())
                                  + " but it instead assigned entry-id: " + entryId + " topic: "
                                  + topic.toStringUtf8() + "ledger: " + lh.getId();
                     logger.error(msg);
@@ -630,7 +698,7 @@ public class BookkeeperPersistenceManage
                 // if this acked entry is the last entry of current ledger
                 // we can add a ChangeLedgerOp to execute to change ledger
                 if (topicInfo.doChangeLedger.get() &&
-                    entryId + topicInfo.currentLedgerRange.startSeqIdIncluded == topicInfo.lastSeqIdBeforeLedgerChange) {
+                    entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() == topicInfo.lastSeqIdBeforeLedgerChange) {
                     // change ledger
                     changeLedger(topic, new Callback<Void>() {
                         @Override
@@ -708,19 +776,106 @@ public class BookkeeperPersistenceManage
         }
 
         void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
-            Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
-            TopicInfo topicInfo = new TopicInfo();
+            final List<LedgerRange> rangesList = ranges.getRangesList();
+            if (!rangesList.isEmpty()) {
+                LedgerRange range = rangesList.get(0);
+                if (range.hasStartSeqIdIncluded()) {
+                    // we already have start seq id
+                    processTopicLedgerRanges(rangesList, version, range.getStartSeqIdIncluded());
+                    return;
+                }
+                getStartSeqIdToProcessTopicLedgerRanges(rangesList, version);
+                return;
+            }
+            // process topic ledger ranges directly
+            processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+        }
 
-            long startOfLedger = 1;
+        /**
+         * Process old version ledger ranges to fetch start seq id.
+         */
+        void getStartSeqIdToProcessTopicLedgerRanges(
+            final List<LedgerRange> rangesList, final Version version) {
+
+            final LedgerRange range = rangesList.get(0);
+
+            if (!range.hasEndSeqIdIncluded()) {
+                // process topic ledger ranges directly
+                processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+                return;
+            }
+
+            final long ledgerId = range.getLedgerId();
+            // open the first ledger to compute right start seq id
+            bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd,
+            new SafeAsynBKCallback.OpenCallback() {
+
+                @Override
+                public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+
+                    if (rc == BKException.Code.NoSuchLedgerExistsException) {
+                        // process next ledger 
+                        processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
+                        return;
+                    } else if (rc != BKException.Code.OK) {
+                        BKException bke = BKException.create(rc);
+                        logger.error("Could not open ledger {} to get start seq id while acquiring topic {} : {}",
+                                     va(ledgerId, topic.toStringUtf8(), bke));
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
+                        return;
+                    }
+
+                    final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
+
+                    // the ledger is closed before, calling close is just a nop operation.
+                    try {
+                        ledgerHandle.close();
+                    } catch (InterruptedException ie) {
+                        // the exception would never be thrown for a read only ledger handle.
+                    } catch (BKException bke) {
+                        // the exception would never be thrown for a read only ledger handle.
+                    }
 
+                    if (numEntriesInLastLedger <= 0) {
+                        String msg = "No entries found in a have-end-seq-id ledger " + ledgerId
+                                     + " when acquiring topic " + topic.toStringUtf8() + ".";
+                        logger.error(msg);
+                        cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                        return;
+                    }
+                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+                    long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
+
+                    processTopicLedgerRanges(rangesList, version, startOfLedger);
+                }
+
+            }, ctx);
+        }
+
+        void processTopicLedgerRanges(final List<LedgerRange> rangesList, final Version version,
+                                      long startOfLedger) {
+            logger.info("Process {} ledgers for topic {} starting from seq id {}.",
+                        va(rangesList.size(), topic.toStringUtf8(), startOfLedger));
+
+            Iterator<LedgerRange> lrIterator = rangesList.iterator();
+
+            TopicInfo topicInfo = new TopicInfo();
             while (lrIterator.hasNext()) {
                 LedgerRange range = lrIterator.next();
 
                 if (range.hasEndSeqIdIncluded()) {
                     // this means it was a valid and completely closed ledger
                     long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                    topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range, startOfLedger));
-                    startOfLedger = endOfLedger + 1;
+                    if (range.hasStartSeqIdIncluded()) {
+                        startOfLedger = range.getStartSeqIdIncluded();
+                    } else {
+                        range = buildLedgerRange(range.getLedgerId(), startOfLedger,
+                                                 range.getEndSeqIdIncluded());
+                    }
+                    topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range));
+                    if (startOfLedger < endOfLedger + 1) {
+                        startOfLedger = endOfLedger + 1;
+                    }
                     continue;
                 }
 
@@ -733,14 +888,19 @@ public class BookkeeperPersistenceManage
                     return;
                 }
 
+                if (range.hasStartSeqIdIncluded()) {
+                    startOfLedger = range.getStartSeqIdIncluded();
+                }
+
                 // The last ledger does not have a valid seq-id, lets try to
                 // find it out
-                recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), version, topicInfo);
+                recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), startOfLedger,
+                                                    version, topicInfo);
                 return;
             }
 
             // All ledgers were found properly closed, just start a new one
-            openNewTopicLedger(topic, version, topicInfo, false, cb, ctx);
+            openNewTopicLedger(topic, version, topicInfo, startOfLedger, false, cb, ctx);
         }
 
         /**
@@ -749,9 +909,15 @@ public class BookkeeperPersistenceManage
          *
          * @param ledgerId
          *            Ledger to be recovered
+         * @param expectedStartSeqId 
+         *            Start seq id of the ledger to recover
+         * @param expectedVersionOfLedgerNode
+         *            Expected version to update ledgers range
+         * @param topicInfo
+         *            Topic info
          */
-        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final Version expectedVersionOfLedgerNode,
-                final TopicInfo topicInfo) {
+        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final long expectedStartSeqId,
+                final Version expectedVersionOfLedgerNode, final TopicInfo topicInfo) {
 
             bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
                 @Override
@@ -772,7 +938,8 @@ public class BookkeeperPersistenceManage
                         // couldn't write to, so just ignore it
                         logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
                         closeLedger(ledgerHandle);
-                        openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, false, cb, ctx);
+                        openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
+                                           expectedStartSeqId, false, cb, ctx);
                         return;
                     }
 
@@ -803,17 +970,25 @@ public class BookkeeperPersistenceManage
                                 return;
                             }
 
-                            long prevLedgerEnd = topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.ledgerRanges
-                                                 .lastKey();
-                            LedgerRange lr = LedgerRange.newBuilder().setLedgerId(ledgerId)
-                                             .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
-                            topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
-                                    new InMemoryLedgerRange(lr, prevLedgerEnd + 1, lh));
+                            long endOfLedger  = lastMessage.getMsgId().getLocalComponent();
+                            long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
+
+                            if (startOfLedger != expectedStartSeqId) {
+                                // gap would be introduced by old version when gc consumed ledgers
+                                String msg = "Expected start seq id of recovered ledger " + ledgerId
+                                             + " to be " + expectedStartSeqId + " but it was "
+                                             + startOfLedger + ".";
+                                logger.warn(msg);
+                            }
+
+                            LedgerRange lr = buildLedgerRange(ledgerId, startOfLedger, lastMessage.getMsgId());
+                            topicInfo.ledgerRanges.put(endOfLedger,
+                                    new InMemoryLedgerRange(lr, lh));
 
-                            logger.info("Recovered unclosed ledger: " + ledgerId + " for topic: "
-                                        + topic.toStringUtf8() + " with " + numEntriesInLastLedger + " entries");
+                            logger.info("Recovered unclosed ledger: {} for topic: {} with {} entries starting from seq id {}",
+                                        va(ledgerId, topic.toStringUtf8(), numEntriesInLastLedger, startOfLedger));
 
-                            openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, false, cb, ctx);
+                            openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, endOfLedger + 1, false, cb, ctx);
                         }
                     }, ctx);
 
@@ -832,6 +1007,8 @@ public class BookkeeperPersistenceManage
      *          Expected Version to Update Ledgers Node.
      * @param topicInfo
      *          Topic Information
+     * @param startSeqId
+     *          Start of sequence id for new ledger
      * @param changeLedger
      *          Whether is it called when changing ledger
      * @param cb
@@ -841,7 +1018,7 @@ public class BookkeeperPersistenceManage
      */
     void openNewTopicLedger(final ByteString topic,
                             final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo,
-                            final boolean changeLedger,
+                            final long startSeqId, final boolean changeLedger,
                             final Callback<Void> cb, final Object ctx) {
         bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
         new SafeAsynBKCallback.CreateCallback() {
@@ -864,13 +1041,13 @@ public class BookkeeperPersistenceManage
                 // compute last seq id
                 if (!changeLedger) {
                     topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
-                                                .setLocalComponent(0).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
+                                                .setLocalComponent(startSeqId - 1).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
                                                 .getEndSeqIdIncluded();
                 }
 
-                LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId()).build();
-                topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, topicInfo.lastSeqIdPushed
-                        .getLocalComponent() + 1, lh);
+                LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId())
+                                        .setStartSeqIdIncluded(startSeqId).build();
+                topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, lh);
 
                 // Persist the fact that we started this new
                 // ledger to ZK
@@ -950,18 +1127,19 @@ public class BookkeeperPersistenceManage
                         cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
                         return;
                     }
+                    long endSeqId = topicInfo.lastSeqIdPushed.getLocalComponent();
                     // update last range
-                    LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(ledgerId)
-                                            .setEndSeqIdIncluded(topicInfo.lastSeqIdPushed).build();
+                    LedgerRange lastRange =
+                        buildLedgerRange(ledgerId, topicInfo.currentLedgerRange.getStartSeqIdIncluded(),
+                                         topicInfo.lastSeqIdPushed);
 
                     topicInfo.currentLedgerRange.range = lastRange;
                     // put current ledger to ledger ranges
-                    topicInfo.ledgerRanges.put(topicInfo.lastSeqIdPushed.getLocalComponent(),
-                                               topicInfo.currentLedgerRange);
+                    topicInfo.ledgerRanges.put(endSeqId, topicInfo.currentLedgerRange);
                     logger.info("Closed written ledger " + ledgerId + " for topic "
                               + topic.toStringUtf8() + " to change ledger.");
                     openNewTopicLedger(topic, topicInfo.ledgerRangesVersion,
-                                       topicInfo, true, cb, ctx);
+                                       topicInfo, endSeqId + 1, true, cb, ctx);
                 }
             }, ctx);
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1402501&r1=1402500&r2=1402501&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Fri Oct 26 13:13:09 2012
@@ -17,9 +17,13 @@
  */
 package org.apache.hedwig.server.persistence;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,23 +32,36 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
 import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.SubscriptionDataManager;
+import org.apache.hedwig.server.meta.TopicOwnershipManager;
+import org.apache.hedwig.server.meta.TopicPersistenceManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.util.Callback;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
 
+@RunWith(Parameterized.class)
 public class TestBookKeeperPersistenceManager extends TestCase {
     static Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
 
@@ -58,11 +75,114 @@ public class TestBookKeeperPersistenceMa
     TopicManager tm;
     BookkeeperPersistenceManager manager;
     PubSubException failureException = null;
-    MetadataManagerFactory metadataManagerFactory;
+    TestMetadataManagerFactory metadataManagerFactory;
+    TopicPersistenceManager tpManager;
+
+    boolean removeStartSeqId;
+
+    static class TestMetadataManagerFactory extends MetadataManagerFactory {
+
+        final MetadataManagerFactory factory;
+        int serviceDownCount = 0;
+
+        TestMetadataManagerFactory(ServerConfiguration conf, ZooKeeper zk) throws Exception {
+            factory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+        }
+
+        public void setServiceDownCount(int count) {
+            this.serviceDownCount = count;
+        }
+
+        @Override
+        public int getCurrentVersion() {
+            return factory.getCurrentVersion();
+        }
+
+        @Override
+        protected MetadataManagerFactory initialize(
+            ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
+            // do nothing
+            return factory;
+        }
+
+        @Override
+        public void shutdown() throws IOException {
+            factory.shutdown();
+        }
+
+        @Override
+        public Iterator<ByteString> getTopics() throws IOException {
+            return factory.getTopics();
+        }
+
+        @Override
+        public TopicPersistenceManager newTopicPersistenceManager() {
+            final TopicPersistenceManager manager = factory.newTopicPersistenceManager();
+            return new TopicPersistenceManager() {
+
+                @Override
+                public void close() throws IOException {
+                    manager.close();
+                }
+
+                @Override
+                public void readTopicPersistenceInfo(ByteString topic,
+                                                     Callback<Versioned<LedgerRanges>> callback, Object ctx) {
+                    if (serviceDownCount > 0) {
+                        --serviceDownCount;
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+                        return;
+                    }
+                    manager.readTopicPersistenceInfo(topic, callback, ctx);
+                }
+                @Override
+                public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
+                                                      Callback<Version> callback, Object ctx) {
+                    if (serviceDownCount > 0) {
+                        --serviceDownCount;
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+                        return;
+                    }
+                    manager.writeTopicPersistenceInfo(topic, ranges, version, callback, ctx);
+                }
+                @Override
+                public void deleteTopicPersistenceInfo(ByteString topic, Version version,
+                                                       Callback<Void> callback, Object ctx) {
+                    if (serviceDownCount > 0) {
+                        --serviceDownCount;
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
+                        return;
+                    }
+                    manager.deleteTopicPersistenceInfo(topic, version, callback, ctx);
+                }
+            };
+        }
+
+        @Override
+        public SubscriptionDataManager newSubscriptionDataManager() {
+            return factory.newSubscriptionDataManager();
+        }
+
+        @Override
+        public TopicOwnershipManager newTopicOwnershipManager() {
+            return factory.newTopicOwnershipManager();
+        }
+    }
+
+    public TestBookKeeperPersistenceManager(boolean removeStartSeqId) {
+        this.removeStartSeqId = removeStartSeqId;
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { true }, { false }
+        });
+    }
 
     @Override
     @Before
-    protected void setUp() throws Exception {
+    public void setUp() throws Exception {
         super.setUp();
 
         // delay read response for 2s.
@@ -76,8 +196,8 @@ public class TestBookKeeperPersistenceMa
         .setThrottleValue(3);
         conf.addConf(bkClientConf);
 
-        metadataManagerFactory =
-            MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+        metadataManagerFactory = new TestMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+        tpManager = metadataManagerFactory.newTopicPersistenceManager();
 
         scheduler = Executors.newScheduledThreadPool(1);
         tm = new TrivialOwnAllTopicManager(conf, scheduler);
@@ -87,9 +207,10 @@ public class TestBookKeeperPersistenceMa
 
     @Override
     @After
-    protected void tearDown() throws Exception {
+    public void tearDown() throws Exception {
         tm.stop();
         manager.stop();
+        tpManager.close();
         metadataManagerFactory.shutdown();
         scheduler.shutdown();
         bktb.tearDown();
@@ -108,6 +229,7 @@ public class TestBookKeeperPersistenceMa
 
         @Override
         public void messageScanned(Object ctx, Message recvMessage) {
+            logger.info("Scanned message : {}", recvMessage.getMsgId().getLocalComponent());
             if (null != nextScan && !runNextScan) {
                 runNextScan = true;
                 manager.scanMessages(nextScan);
@@ -133,7 +255,7 @@ public class TestBookKeeperPersistenceMa
         public void scanFinished(Object ctx, ReasonForFinish reason) {
             LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
             try {
-                statusQueue.put(true);
+                statusQueue.put(pubMsgs.isEmpty());
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
@@ -150,6 +272,164 @@ public class TestBookKeeperPersistenceMa
     }
 
     @Test
+    public void testScanMessagesOnClosedLedgerAfterDeleteLedger() throws Exception {
+        scanMessagesAfterDeleteLedgerTest(2);
+    }
+
+    @Test
+    public void testScanMessagesOnUnclosedLedgerAfterDeleteLedger() throws Exception {
+        scanMessagesAfterDeleteLedgerTest(1);
+    }
+
+    private void scanMessagesAfterDeleteLedgerTest(int numLedgers) throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestScanMessagesAfterDeleteLedger");
+
+        List<Message> msgs = new ArrayList<Message>();
+
+        acquireTopic(topic);
+        msgs.addAll(publishMessages(topic, 2));
+
+        for (int i=0; i<numLedgers; i++) {
+            releaseTopic(topic);
+            // acquire topic again to force a new ledger
+            acquireTopic(topic);
+            msgs.addAll(publishMessages(topic, 2));
+        }
+
+        consumedUntil(topic, 2L);
+        // Wait until ledger ranges is updated.
+        Thread.sleep(2000L);
+        releaseTopic(topic);
+
+        // acquire topic again
+        acquireTopic(topic);
+        // scan messages starting from 3
+        LinkedBlockingQueue<Boolean> statusQueue =
+            new LinkedBlockingQueue<Boolean>();
+        manager.scanMessages(new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
+                             new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
+        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+    }
+
+    @Test
+    public void testScanMessagesOnEmptyLedgerAfterDeleteLedger() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnEmptyLedgerAfterDeleteLedger");
+
+        List<Message> msgs = new ArrayList<Message>();
+
+        acquireTopic(topic);
+        msgs.addAll(publishMessages(topic, 2));
+        releaseTopic(topic);
+
+        // acquire topic again to force a new ledger
+        acquireTopic(topic);
+        logger.info("Consumed messages.");
+        consumedUntil(topic, 2L);
+        // Wait until ledger ranges is updated.
+        Thread.sleep(2000L);
+        logger.info("Released topic with an empty ledger.");
+        // release topic to force an empty ledger
+        releaseTopic(topic);
+
+        // publish 2 more messages, these message expected to be id 3 and 4
+        acquireTopic(topic);
+        logger.info("Published more messages.");
+        msgs.addAll(publishMessages(topic, 2));
+        releaseTopic(topic);
+
+        // acquire topic again
+        acquireTopic(topic);
+        // scan messages starting from 3
+        LinkedBlockingQueue<Boolean> statusQueue =
+            new LinkedBlockingQueue<Boolean>();
+        long startSeqId = removeStartSeqId ? 1 : 3;
+        manager.scanMessages(new RangeScanRequest(topic, startSeqId, 2, Long.MAX_VALUE,
+                             new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
+        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+    }
+
+    @Test
+    public void testFailedToDeleteLedger1() throws Exception {
+        failedToDeleteLedgersTest(1);
+    }
+
+    @Test
+    public void testFailedToDeleteLedger2() throws Exception {
+        // succeed to delete second ledger
+        failedToDeleteLedgersTest(2);
+    }
+
+    private void failedToDeleteLedgersTest(int numLedgers) throws Exception {
+        final ByteString topic = ByteString.copyFromUtf8("TestFailedToDeleteLedger");
+        final int serviceDownCount = 1;
+
+        List<Message> msgs = new ArrayList<Message>();
+
+        for (int i=0; i<numLedgers; i++) {
+            acquireTopic(topic);
+            msgs.addAll(publishMessages(topic, 2));
+            releaseTopic(topic);
+        }
+
+        // acquire topic again to force a new ledger
+        acquireTopic(topic);
+        logger.info("Consumed messages.");
+        metadataManagerFactory.setServiceDownCount(serviceDownCount);
+        // failed consumed
+        consumedUntil(topic, 2L * numLedgers);
+        // Wait until ledger ranges is updated.
+        Thread.sleep(2000L);
+        logger.info("Released topic with an empty ledger.");
+        // release topic to force an empty ledger
+        releaseTopic(topic);
+
+        // publish 2 more messages, these message expected to be id 3 and 4
+        acquireTopic(topic);
+        logger.info("Published more messages.");
+        msgs.addAll(publishMessages(topic, 2));
+        releaseTopic(topic);
+
+        // acquire topic again
+        acquireTopic(topic);
+        LinkedBlockingQueue<Boolean> statusQueue =
+            new LinkedBlockingQueue<Boolean>();
+        manager.scanMessages(new RangeScanRequest(topic, numLedgers * 2 + 1, 2, Long.MAX_VALUE,
+                             new RangeScanVerifier(subMessages(msgs, numLedgers * 2, numLedgers * 2 + 1), null), statusQueue));
+        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
+        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
+
+        // consumed 
+        consumedUntil(topic, (numLedgers + 1) * 2L);
+        // Wait until ledger ranges is updated.
+        Thread.sleep(2000L);
+
+        Semaphore latch = new Semaphore(1);
+        latch.acquire();
+        tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
+            @Override
+            public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
+                if (null == ranges || ranges.getValue().getRangesList().size() > 1) {
+                    failureException = new PubSubException.NoTopicPersistenceInfoException("Invalid persistence info found for topic " + topic.toStringUtf8());
+                    ((Semaphore)ctx).release();
+                    return;
+                }
+                failureException = null;
+                ((Semaphore)ctx).release();
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                failureException = exception;
+                ((Semaphore)ctx).release();
+            }
+        }, latch);
+        latch.acquire();
+        latch.release();
+        assertNull("Should not fail with exception.", failureException);
+    }
+
+    @Test
     public void testScanMessagesOnTwoLedgers() throws Exception {
         ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers");
 
@@ -245,8 +525,62 @@ public class TestBookKeeperPersistenceMa
         }
     }
 
-    protected void releaseTopic(ByteString topic) throws Exception {
+    protected void releaseTopic(final ByteString topic) throws Exception {
         manager.lostTopic(topic);
+        // backward testing ledger ranges without start seq id
+        if (removeStartSeqId) {
+            Semaphore latch = new Semaphore(1);
+            latch.acquire();
+            tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
+                @Override
+                public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
+                    if (null == ranges) {
+                        failureException = new PubSubException.NoTopicPersistenceInfoException("No persistence info found for topic " + topic.toStringUtf8());
+                        ((Semaphore)ctx).release();
+                        return;
+                    }
+
+                    // build a new ledger ranges w/o start seq id.
+                    LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+                    final List<LedgerRange> rangesList = ranges.getValue().getRangesList();
+                    for (LedgerRange range : rangesList) {
+                        LedgerRange.Builder newRangeBuilder = LedgerRange.newBuilder();
+                        newRangeBuilder.setLedgerId(range.getLedgerId());
+                        if (range.hasEndSeqIdIncluded()) {
+                            newRangeBuilder.setEndSeqIdIncluded(range.getEndSeqIdIncluded());
+                        }
+                        builder.addRanges(newRangeBuilder.build());
+                    }
+                    tpManager.writeTopicPersistenceInfo(topic, builder.build(), ranges.getVersion(),
+                    new Callback<Version>() {
+                        @Override
+                        public void operationFinished(Object ctx, Version newVersion) {
+                            failureException = null;
+                            ((Semaphore)ctx).release();
+                        }
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            failureException = exception;
+                            ((Semaphore)ctx).release();
+                        }
+                    }, ctx);
+                }
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    failureException = exception;
+                    ((Semaphore)ctx).release();
+                }
+            }, latch);
+            latch.acquire();
+            latch.release();
+            if (null != failureException) {
+                throw failureException;
+            }
+        }
+    }
+
+    protected void consumedUntil(ByteString topic, long seqId) throws Exception {
+        manager.consumedUntil(topic, seqId);
     }
 
 }