You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/18 11:55:36 UTC

svn commit: r1423405 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/

Author: ivank
Date: Tue Dec 18 10:55:34 2012
New Revision: 1423405

URL: http://svn.apache.org/viewvc?rev=1423405&view=rev
Log:
BOOKKEEPER-509: TestBookKeeperPersistenceManager failed on latest trunk (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1423405&r1=1423404&r2=1423405&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Dec 18 10:55:34 2012
@@ -144,6 +144,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-512: BookieZkExpireTest fails periodically (ivank via sijie)
 
+        BOOKKEEPER-509: TestBookKeeperPersistenceManager failed on latest trunk (sijie via ivank)
+
       hedwig-protocol:
 
         BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1423405&r1=1423404&r2=1423405&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Tue Dec 18 10:55:34 2012
@@ -208,6 +208,7 @@ class PendingReadOp implements Enumerati
         }
 
         // return true if we managed to complete the entry
+        // return false if the read entry is not complete or it is already completed before
         boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
             ChannelBufferInputStream is;
             try {
@@ -263,11 +264,20 @@ class PendingReadOp implements Enumerati
         if (speculativeReadTimeout > 0) {
             speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
                     public void run() {
+                        int x = 0;
                         for (LedgerEntryRequest r : seq) {
                             if (!r.isComplete()) {
-                                r.maybeSendSpeculativeRead(heardFromHosts);
+                                if (null != r.maybeSendSpeculativeRead(heardFromHosts)) {
+                                    LOG.debug("Send speculative read for {}. Hosts heard are {}.",
+                                              r, heardFromHosts);
+                                    ++x;
+                                }
                             }
                         }
+                        if (x > 0) {
+                            LOG.info("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
+                                     new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
+                        }
                     }
                 }, speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
         }
@@ -320,10 +330,9 @@ class PendingReadOp implements Enumerati
 
         if (entry.complete(rctx.to, buffer)) {
             numPendingEntries--;
-        }
-
-        if (numPendingEntries == 0) {
-            submitCallback(BKException.Code.OK);
+            if (numPendingEntries == 0) {
+                submitCallback(BKException.Code.OK);
+            }
         }
 
         if(numPendingEntries < 0)
@@ -333,6 +342,7 @@ class PendingReadOp implements Enumerati
     private void submitCallback(int code) {
         if (speculativeTask != null) {
             speculativeTask.cancel(true);
+            speculativeTask = null;
         }
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1423405&r1=1423404&r2=1423405&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Tue Dec 18 10:55:34 2012
@@ -283,6 +283,10 @@ public class BookkeeperPersistenceManage
                 @Override
                 public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                     if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+                        if (rc == BKException.Code.OK) {
+                            // means that there is no entries read, provide a meaningful exception
+                            rc = BKException.Code.NoSuchEntryException;
+                        }
                         BKException bke = BKException.create(rc);
                         logger.error("Error while reading from ledger: " + imlr.range.getLedgerId() + " for topic: "
                                      + topic.toStringUtf8(), bke);
@@ -965,6 +969,11 @@ public class BookkeeperPersistenceManage
                         public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
                         Object ctx) {
                             if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
+                                if (rc == BKException.Code.OK) {
+                                    // means that there is no entries read, provide a meaningful exception
+                                    rc = BKException.Code.NoSuchEntryException;
+                                }
+                                logger.info("Received error code {}", rc);
                                 BKException bke = BKException.create(rc);
                                 logger.error("While recovering ledger: " + ledgerId + " for topic: "
                                              + topic.toStringUtf8() + ", could not read last entry", bke);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1423405&r1=1423404&r2=1423405&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Tue Dec 18 10:55:34 2012
@@ -255,13 +255,8 @@ public class TestBookKeeperPersistenceMa
         });
     }
 
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-
-        // delay read response for 2s.
-        bktb = new BookKeeperTestBase(numBookies, readDelay);
+    private void startCluster(long delay) throws Exception {
+        bktb = new BookKeeperTestBase(numBookies, 0L);
         bktb.setUp();
 
         conf = new ServerConfiguration() {
@@ -294,9 +289,7 @@ public class TestBookKeeperPersistenceMa
         sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm, manager, null, scheduler);
     }
 
-    @Override
-    @After
-    public void tearDown() throws Exception {
+    private void stopCluster() throws Exception {
         tm.stop();
         manager.stop();
         sm.stop();
@@ -304,6 +297,19 @@ public class TestBookKeeperPersistenceMa
         metadataManagerFactory.shutdown();
         scheduler.shutdown();
         bktb.tearDown();
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        startCluster(0L);
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        stopCluster();
         super.tearDown();
     }
 
@@ -521,6 +527,9 @@ public class TestBookKeeperPersistenceMa
 
     @Test
     public void testScanMessagesOnTwoLedgers() throws Exception {
+        stopCluster();
+        startCluster(readDelay);
+
         ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers");
 
         List<Message> msgs = new ArrayList<Message>();