You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/05/31 14:49:19 UTC

[bookkeeper] branch master updated: Update lastLogMark to EOF when replaying journal

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f89e3fb  Update lastLogMark to EOF when replaying journal
f89e3fb is described below

commit f89e3fbb751fbef7012e695ed5873eade13ba4f5
Author: karanmehta93 <k....@salesforce.com>
AuthorDate: Fri May 31 09:49:13 2019 -0500

    Update lastLogMark to EOF when replaying journal
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    The [commit](https://github.com/apache/bookkeeper/commit/36be8362399341022c8de64f9319270726df2cb3) caused integration test failure `test101_RegenerateIndex`, with the exception
    ```
    ```java.io.IOException: Invalid argument
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:46)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159)
        at org.apache.bookkeeper.bookie.JournalChannel.read(JournalChannel.java:257)
        at org.apache.bookkeeper.bookie.Journal.fullRead(Journal.java:1171)
        at org.apache.bookkeeper.bookie.Journal.scanJournal(Journal.java:792)
        at org.apache.bookkeeper.bookie.Bookie.replay(Bookie.java:924)
        at org.apache.bookkeeper.bookie.Bookie.readJournal(Bookie.java:886)
        at org.apache.bookkeeper.bookie.Bookie.start(Bookie.java:943)
        at org.apache.bookkeeper.proto.BookieServer.start(BookieServer.java:141)
        at org.apache.bookkeeper.server.service.BookieService.doStart(BookieService.java:58)
        at org.apache.bookkeeper.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:78)
        at org.apache.bookkeeper.common.component.LifecycleComponentStack.lambda$start$2(LifecycleComponentStack.java:113)
        at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:408)
        at org.apache.bookkeeper.common.component.LifecycleComponentStack.start(LifecycleComponentStack.java:113)
        at org.apache.bookkeeper.common.component.ComponentStarter.startComponent(ComponentStarter.java:80)
        at org.apache.bookkeeper.server.Main.doMain(Main.java:229)
        at org.apache.bookkeeper.server.Main.main(Main.java:203)
    ```
    
    As discussed on slack, it is hard to figure out an exact reason as to why the native JNI call fails with an invalid argument. Hence this PR proposes that the `lastLogMark` is updated to journal EOF instead of an arbitrary LONG.MAX_VALUE. The FileChannel interface defines that the implementors can pass in any long offset and the file handler should return EOF immediately when trying to read it. However it doesn't seem to be working as expected.
    
    ### Changes
    
    Updated `Journal#setLastLogMark()` method to accept an `scanOffset` instead of constant `LONG.MAX_VALUE`.
    
    ivankelly eolivelli
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #2105 from karanmehta93/master
---
 .../src/main/java/org/apache/bookkeeper/bookie/Bookie.java    |  7 ++++---
 .../src/main/java/org/apache/bookkeeper/bookie/Journal.java   | 11 +++++++----
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 467cb03..9bb9e5d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -921,10 +921,11 @@ public class Bookie extends BookieCriticalThread {
                 logPosition = markedLog.getLogFileOffset();
             }
             LOG.info("Replaying journal {} from position {}", id, logPosition);
-            journal.scanJournal(id, logPosition, scanner);
-            // Update LastLogMark to Long.MAX_VALUE position after replaying journal
+            long scanOffset = journal.scanJournal(id, logPosition, scanner);
+            // Update LastLogMark after completely replaying journal
+            // scanOffset will point to EOF position
             // After LedgerStorage flush, SyncThread should persist this to disk
-            journal.setLastLogMarkToEof(id);
+            journal.setLastLogMark(id, scanOffset);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 8941b68..06fc5e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -710,12 +710,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     }
 
     /**
-     * Update lastLogMark of the journal,
+     * Update lastLogMark of the journal
      * Indicates that the file has been processed.
      * @param id
+     * @param scanOffset
      */
-    void setLastLogMarkToEof(Long id) {
-        lastLogMark.getCurMark().setLogMark(id, Long.MAX_VALUE);
+    void setLastLogMark(Long id, long scanOffset) {
+        lastLogMark.setCurLogMark(id, scanOffset);
     }
 
     /**
@@ -769,9 +770,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
      * @param journalId Journal Log Id
      * @param journalPos Offset to start scanning
      * @param scanner Scanner to handle entries
+     * @return scanOffset - represents the byte till which journal was read
      * @throws IOException
      */
-    public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
+    public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
@@ -832,6 +834,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     scanner.process(journalVersion, offset, recBuff);
                 }
             }
+            return recLog.fc.position();
         } finally {
             recLog.close();
         }