You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/03/07 13:14:45 UTC

svn commit: r1575243 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/

Author: ivank
Date: Fri Mar  7 12:14:45 2014
New Revision: 1575243

URL: http://svn.apache.org/r1575243
Log:
BOOKKEEPER-717: journal should look forward to group time-out entries (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar  7 12:14:45 2014
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-654: Bookkeeper client operations are allowed even after its closure, bk#close() (sijie via ivank)
 
+        BOOKKEEPER-717: journal should look forward to group time-out entries (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Fri Mar  7 12:14:45 2014
@@ -137,19 +137,19 @@ class Journal extends BookieCriticalThre
             this.curMark = new LogMark(logId, logPosition);
         }
 
-        synchronized void setCurLogMark(long logId, long logPosition) {
+        void setCurLogMark(long logId, long logPosition) {
             curMark.setLogMark(logId, logPosition);
         }
 
-        synchronized LastLogMark markLog() {
+        LastLogMark markLog() {
             return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
         }
 
-        synchronized LogMark getCurMark() {
+        LogMark getCurMark() {
             return curMark;
         }
 
-        synchronized void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
+        void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             // we should record <logId, logPosition> marked in markLog
@@ -184,7 +184,7 @@ class Journal extends BookieCriticalThre
          * The last mark should first be max journal log id,
          * and then max log position in max journal log.
          */
-        synchronized void readLog() {
+        void readLog() {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
@@ -204,7 +204,7 @@ class Journal extends BookieCriticalThre
                     bb.clear();
                     mark.readLogMark(bb);
                     if (curMark.compare(mark) < 0) {
-                        curMark.setLogMark(mark.getLogFileId(), mark.logFileOffset);
+                        curMark.setLogMark(mark.getLogFileId(), mark.getLogFileOffset());
                     }
                 } catch (IOException e) {
                     LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
@@ -452,11 +452,11 @@ class Journal extends BookieCriticalThre
 
     final File journalDirectory;
     final ServerConfiguration conf;
-    ForceWriteThread forceWriteThread;
-    // should we group force writes
-    private final boolean enableGroupForceWrites;
+    final ForceWriteThread forceWriteThread;
     // Time after which we will stop grouping and issue the flush
     private final long maxGroupWaitInMSec;
+    // Threshold after which we flush any buffered journal entries
+    private final long bufferedEntriesThreshold;
     // Threshold after which we flush any buffered journal writes
     private final long bufferedWritesThreshold;
     // should we flush if the queue is empty
@@ -472,8 +472,8 @@ class Journal extends BookieCriticalThre
     private final ExecutorService cbThreadPool;
 
     // journal entry queue to commit
-    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
-    LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
+    final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+    final LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
@@ -487,16 +487,16 @@ class Journal extends BookieCriticalThre
         this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
         this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
-        this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
-        this.forceWriteThread = new ForceWriteThread(this, enableGroupForceWrites);
+        this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
         this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec();
         this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
+        this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
         this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
                                                          new DaemonThreadFactory());
 
         // Unless there is a cap on the max wait (which requires group force writes)
         // we cannot skip flushing for queue empty
-        this.flushWhenQueueEmpty = !enableGroupForceWrites || conf.getJournalFlushWhenQueueEmpty();
+        this.flushWhenQueueEmpty = maxGroupWaitInMSec <= 0 || conf.getJournalFlushWhenQueueEmpty();
 
         this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
@@ -679,7 +679,7 @@ class Journal extends BookieCriticalThre
      * new journal file using current timestamp, and continue persistence logic.
      * Those journals will be garbage collected in SyncThread.
      * </p>
-     * @see Bookie#SyncThread
+     * @see org.apache.bookkeeper.bookie.SyncThread
      */
     @Override
     public void run() {
@@ -695,6 +695,7 @@ class Journal extends BookieCriticalThre
             long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
             BufferedChannel bc = null;
             long lastFlushPosition = 0;
+            boolean groupWhenTimeout = false;
 
             QueueEntry qe = null;
             while (true) {
@@ -723,10 +724,19 @@ class Journal extends BookieCriticalThre
                         boolean shouldFlush = false;
                         // We should issue a forceWrite if any of the three conditions below holds good
                         // 1. If the oldest pending entry has been pending for longer than the max wait time
-                        if (enableGroupForceWrites && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+                        if (maxGroupWaitInMSec > 0 && !groupWhenTimeout && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+                            groupWhenTimeout = true;
+                        } else if (maxGroupWaitInMSec > 0 && groupWhenTimeout && qe != null && MathUtils.elapsedMSec(qe.enqueueTime) < maxGroupWaitInMSec) {
+                            // when group timeout, it would be better to look forward, as there might be lots of entries already timeout
+                            // due to a previous slow write (writing to filesystem which impacted by force write).
+                            // Group those entries in the queue
+                            // a) already timeout
+                            // b) limit the number of entries to group
+                            groupWhenTimeout = false;
                             shouldFlush = true;
-                        } else if ((bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
-                            // 2. If we have buffered more than the buffWriteThreshold
+                        } else if ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) ||
+                                (bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
+                            // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
                             shouldFlush = true;
                         } else if (qe == null) {
                             // We should get here only if we flushWhenQueueEmpty is true else we would wait
@@ -767,6 +777,7 @@ class Journal extends BookieCriticalThre
                 if (qe == null) { // no more queue entry
                     continue;
                 }
+
                 lenBuff.clear();
                 lenBuff.putInt(qe.entry.remaining());
                 lenBuff.flip();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Fri Mar  7 12:14:45 2014
@@ -33,46 +33,46 @@ class LogMark {
     }
 
     public LogMark(LogMark other) {
-        setLogMark(other.logFileId, other.logFileOffset);
+        setLogMark(other.getLogFileId(), other.getLogFileOffset());
     }
 
     public LogMark(long logFileId, long logFileOffset) {
         setLogMark(logFileId, logFileOffset);
     }
 
-    public long getLogFileId() {
+    public synchronized long getLogFileId() {
         return logFileId;
     }
 
-    public long getLogFileOffset() {
+    public synchronized long getLogFileOffset() {
         return logFileOffset;
     }
 
-    public void readLogMark(ByteBuffer bb) {
+    public synchronized void readLogMark(ByteBuffer bb) {
         logFileId = bb.getLong();
         logFileOffset = bb.getLong();
     }
 
-    public void writeLogMark(ByteBuffer bb) {
+    public synchronized void writeLogMark(ByteBuffer bb) {
         bb.putLong(logFileId);
         bb.putLong(logFileOffset);
     }
 
-    public void setLogMark(long logFileId, long logFileOffset) {
+    public synchronized void setLogMark(long logFileId, long logFileOffset) {
         this.logFileId = logFileId;
         this.logFileOffset = logFileOffset;
     }
 
-    public int compare(LogMark other) {
-        long ret = this.logFileId - other.logFileId;
+    public synchronized int compare(LogMark other) {
+        long ret = this.logFileId - other.getLogFileId();
         if (ret == 0) {
-            ret = this.logFileOffset - other.logFileOffset;
+            ret = this.logFileOffset - other.getLogFileOffset();
         }
         return (ret < 0)? -1 : ((ret > 0)? 1 : 0);
     }
 
     @Override
-    public String toString() {
+    public synchronized String toString() {
         StringBuilder sb = new StringBuilder();
 
         sb.append("LogMark: logFileId - ").append(logFileId)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Fri Mar  7 12:14:45 2014
@@ -57,6 +57,7 @@ public class ServerConfiguration extends
     protected final static String JOURNAL_ADAPTIVE_GROUP_WRITES = "journalAdaptiveGroupWrites";
     protected final static String JOURNAL_MAX_GROUP_WAIT_MSEC = "journalMaxGroupWaitMSec";
     protected final static String JOURNAL_BUFFERED_WRITES_THRESHOLD = "journalBufferedWritesThreshold";
+    protected final static String JOURNAL_BUFFERED_ENTRIES_THRESHOLD = "journalBufferedEntriesThreshold";
     protected final static String JOURNAL_FLUSH_WHEN_QUEUE_EMPTY = "journalFlushWhenQueueEmpty";
     protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
     protected final static String JOURNAL_PRE_ALLOC_SIZE = "journalPreAllocSizeMB";
@@ -485,19 +486,19 @@ public class ServerConfiguration extends
         return ledgerDirs;
     }
 
-    /** 
+    /**
      * Get dir name to store index files.
-     *   
+     *
      * @return ledger index dir name, if no index dirs provided return null
-     */  
+     */
     public String[] getIndexDirNames() {
         if (!this.containsKey(INDEX_DIRS)) {
             return null;
         }
         return this.getStringArray(INDEX_DIRS);
-    }   
+    }
 
-    /** 
+    /**
      * Set dir name to store index files.
      *
      * @param indexDirs
@@ -718,18 +719,18 @@ public class ServerConfiguration extends
         setProperty(MAJOR_COMPACTION_INTERVAL, interval);
         return this;
     }
-    
+
     /**
      * Set the grace period which the rereplication worker will wait before
      * fencing and rereplicating a ledger fragment which is still being written
      * to, on bookie failure.
-     * 
-     * The grace period allows the writer to detect the bookie failure, and
-     * start replicating the ledger fragment. If the writer writes nothing
+     *
+     * The grace period allows the writer to detect the bookie failure, and and
+     * start writing to another ledger fragment. If the writer writes nothing
      * during the grace period, the rereplication worker assumes that it has
-     * crashed and fences the ledger, preventing any further writes to that 
-     * ledger.
-     * 
+     * crashed and therefore fences the ledger, preventing any further writes to
+     * that ledger.
+     *
      * @see org.apache.bookkeeper.client.BookKeeper#openLedger
      * 
      * @param waitTime time to wait before replicating ledger fragment
@@ -885,14 +886,38 @@ public class ServerConfiguration extends
     }
 
     /**
-     * Maximum latency to impose on a journal write to achieve grouping
+     * Maximum bytes to buffer to impose on a journal write to achieve grouping
      *
-     * @return max wait for grouping
+     * @return max bytes to buffer
      */
     public long getJournalBufferedWritesThreshold() {
         return getLong(JOURNAL_BUFFERED_WRITES_THRESHOLD, 512 * 1024);
     }
 
+    /**
+     * Maximum entries to buffer to impose on a journal write to achieve grouping.
+     * Use {@link #getJournalBufferedWritesThreshold()} if this is set to zero or
+     * less than zero.
+     *
+     * @return max entries to buffer.
+     */
+    public long getJournalBufferedEntriesThreshold() {
+        return getLong(JOURNAL_BUFFERED_ENTRIES_THRESHOLD, 0);
+    }
+
+    /**
+     * Set maximum entries to buffer to impose on a journal write to achieve grouping.
+     * Use {@link #getJournalBufferedWritesThreshold()} set this to zero or less than
+     * zero.
+     *
+     * @param maxEntries
+     *          maximum entries to buffer.
+     * @return server configuration.
+     */
+    public ServerConfiguration setJournalBufferedEntriesThreshold(int maxEntries) {
+        setProperty(JOURNAL_BUFFERED_ENTRIES_THRESHOLD, maxEntries);
+        return this;
+    }
 
     /**
      * Set if we should flush the journal when queue is empty