You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/03/07 13:14:45 UTC
svn commit: r1575243 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
Author: ivank
Date: Fri Mar 7 12:14:45 2014
New Revision: 1575243
URL: http://svn.apache.org/r1575243
Log:
BOOKKEEPER-717: journal should look forward to group time-out entries (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar 7 12:14:45 2014
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
BOOKKEEPER-654: Bookkeeper client operations are allowed even after its closure, bk#close() (sijie via ivank)
+ BOOKKEEPER-717: journal should look forward to group time-out entries (sijie via ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Fri Mar 7 12:14:45 2014
@@ -137,19 +137,19 @@ class Journal extends BookieCriticalThre
this.curMark = new LogMark(logId, logPosition);
}
- synchronized void setCurLogMark(long logId, long logPosition) {
+ void setCurLogMark(long logId, long logPosition) {
curMark.setLogMark(logId, logPosition);
}
- synchronized LastLogMark markLog() {
+ LastLogMark markLog() {
return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
}
- synchronized LogMark getCurMark() {
+ LogMark getCurMark() {
return curMark;
}
- synchronized void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
+ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
@@ -184,7 +184,7 @@ class Journal extends BookieCriticalThre
* The last mark should first be max journal log id,
* and then max log position in max journal log.
*/
- synchronized void readLog() {
+ void readLog() {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
@@ -204,7 +204,7 @@ class Journal extends BookieCriticalThre
bb.clear();
mark.readLogMark(bb);
if (curMark.compare(mark) < 0) {
- curMark.setLogMark(mark.getLogFileId(), mark.logFileOffset);
+ curMark.setLogMark(mark.getLogFileId(), mark.getLogFileOffset());
}
} catch (IOException e) {
LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
@@ -452,11 +452,11 @@ class Journal extends BookieCriticalThre
final File journalDirectory;
final ServerConfiguration conf;
- ForceWriteThread forceWriteThread;
- // should we group force writes
- private final boolean enableGroupForceWrites;
+ final ForceWriteThread forceWriteThread;
// Time after which we will stop grouping and issue the flush
private final long maxGroupWaitInMSec;
+ // Threshold after which we flush any buffered journal entries
+ private final long bufferedEntriesThreshold;
// Threshold after which we flush any buffered journal writes
private final long bufferedWritesThreshold;
// should we flush if the queue is empty
@@ -472,8 +472,8 @@ class Journal extends BookieCriticalThre
private final ExecutorService cbThreadPool;
// journal entry queue to commit
- LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
- LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
+ final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+ final LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
@@ -487,16 +487,16 @@ class Journal extends BookieCriticalThre
this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.maxBackupJournals = conf.getMaxBackupJournals();
- this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
- this.forceWriteThread = new ForceWriteThread(this, enableGroupForceWrites);
+ this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec();
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
+ this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
new DaemonThreadFactory());
// Unless there is a cap on the max wait (which requires group force writes)
// we cannot skip flushing for queue empty
- this.flushWhenQueueEmpty = !enableGroupForceWrites || conf.getJournalFlushWhenQueueEmpty();
+ this.flushWhenQueueEmpty = maxGroupWaitInMSec <= 0 || conf.getJournalFlushWhenQueueEmpty();
this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
@@ -679,7 +679,7 @@ class Journal extends BookieCriticalThre
* new journal file using current timestamp, and continue persistence logic.
* Those journals will be garbage collected in SyncThread.
* </p>
- * @see Bookie#SyncThread
+ * @see org.apache.bookkeeper.bookie.SyncThread
*/
@Override
public void run() {
@@ -695,6 +695,7 @@ class Journal extends BookieCriticalThre
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
BufferedChannel bc = null;
long lastFlushPosition = 0;
+ boolean groupWhenTimeout = false;
QueueEntry qe = null;
while (true) {
@@ -723,10 +724,19 @@ class Journal extends BookieCriticalThre
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
- if (enableGroupForceWrites && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+ if (maxGroupWaitInMSec > 0 && !groupWhenTimeout && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+ groupWhenTimeout = true;
+ } else if (maxGroupWaitInMSec > 0 && groupWhenTimeout && qe != null && MathUtils.elapsedMSec(qe.enqueueTime) < maxGroupWaitInMSec) {
+ // when group timeout, it would be better to look forward, as there might be lots of entries already timeout
+ // due to a previous slow write (writing to filesystem which impacted by force write).
+ // Group those entries in the queue
+ // a) already timeout
+ // b) limit the number of entries to group
+ groupWhenTimeout = false;
shouldFlush = true;
- } else if ((bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
- // 2. If we have buffered more than the buffWriteThreshold
+ } else if ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) ||
+ (bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
+ // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
shouldFlush = true;
} else if (qe == null) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
@@ -767,6 +777,7 @@ class Journal extends BookieCriticalThre
if (qe == null) { // no more queue entry
continue;
}
+
lenBuff.clear();
lenBuff.putInt(qe.entry.remaining());
lenBuff.flip();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Fri Mar 7 12:14:45 2014
@@ -33,46 +33,46 @@ class LogMark {
}
public LogMark(LogMark other) {
- setLogMark(other.logFileId, other.logFileOffset);
+ setLogMark(other.getLogFileId(), other.getLogFileOffset());
}
public LogMark(long logFileId, long logFileOffset) {
setLogMark(logFileId, logFileOffset);
}
- public long getLogFileId() {
+ public synchronized long getLogFileId() {
return logFileId;
}
- public long getLogFileOffset() {
+ public synchronized long getLogFileOffset() {
return logFileOffset;
}
- public void readLogMark(ByteBuffer bb) {
+ public synchronized void readLogMark(ByteBuffer bb) {
logFileId = bb.getLong();
logFileOffset = bb.getLong();
}
- public void writeLogMark(ByteBuffer bb) {
+ public synchronized void writeLogMark(ByteBuffer bb) {
bb.putLong(logFileId);
bb.putLong(logFileOffset);
}
- public void setLogMark(long logFileId, long logFileOffset) {
+ public synchronized void setLogMark(long logFileId, long logFileOffset) {
this.logFileId = logFileId;
this.logFileOffset = logFileOffset;
}
- public int compare(LogMark other) {
- long ret = this.logFileId - other.logFileId;
+ public synchronized int compare(LogMark other) {
+ long ret = this.logFileId - other.getLogFileId();
if (ret == 0) {
- ret = this.logFileOffset - other.logFileOffset;
+ ret = this.logFileOffset - other.getLogFileOffset();
}
return (ret < 0)? -1 : ((ret > 0)? 1 : 0);
}
@Override
- public String toString() {
+ public synchronized String toString() {
StringBuilder sb = new StringBuilder();
sb.append("LogMark: logFileId - ").append(logFileId)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1575243&r1=1575242&r2=1575243&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Fri Mar 7 12:14:45 2014
@@ -57,6 +57,7 @@ public class ServerConfiguration extends
protected final static String JOURNAL_ADAPTIVE_GROUP_WRITES = "journalAdaptiveGroupWrites";
protected final static String JOURNAL_MAX_GROUP_WAIT_MSEC = "journalMaxGroupWaitMSec";
protected final static String JOURNAL_BUFFERED_WRITES_THRESHOLD = "journalBufferedWritesThreshold";
+ protected final static String JOURNAL_BUFFERED_ENTRIES_THRESHOLD = "journalBufferedEntriesThreshold";
protected final static String JOURNAL_FLUSH_WHEN_QUEUE_EMPTY = "journalFlushWhenQueueEmpty";
protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
protected final static String JOURNAL_PRE_ALLOC_SIZE = "journalPreAllocSizeMB";
@@ -485,19 +486,19 @@ public class ServerConfiguration extends
return ledgerDirs;
}
- /**
+ /**
* Get dir name to store index files.
- *
+ *
* @return ledger index dir name, if no index dirs provided return null
- */
+ */
public String[] getIndexDirNames() {
if (!this.containsKey(INDEX_DIRS)) {
return null;
}
return this.getStringArray(INDEX_DIRS);
- }
+ }
- /**
+ /**
* Set dir name to store index files.
*
* @param indexDirs
@@ -718,18 +719,18 @@ public class ServerConfiguration extends
setProperty(MAJOR_COMPACTION_INTERVAL, interval);
return this;
}
-
+
/**
* Set the grace period which the rereplication worker will wait before
* fencing and rereplicating a ledger fragment which is still being written
* to, on bookie failure.
- *
- * The grace period allows the writer to detect the bookie failure, and
- * start replicating the ledger fragment. If the writer writes nothing
+ *
+ * The grace period allows the writer to detect the bookie failure, and and
+ * start writing to another ledger fragment. If the writer writes nothing
* during the grace period, the rereplication worker assumes that it has
- * crashed and fences the ledger, preventing any further writes to that
- * ledger.
- *
+ * crashed and therefore fences the ledger, preventing any further writes to
+ * that ledger.
+ *
* @see org.apache.bookkeeper.client.BookKeeper#openLedger
*
* @param waitTime time to wait before replicating ledger fragment
@@ -885,14 +886,38 @@ public class ServerConfiguration extends
}
/**
- * Maximum latency to impose on a journal write to achieve grouping
+ * Maximum bytes to buffer to impose on a journal write to achieve grouping
*
- * @return max wait for grouping
+ * @return max bytes to buffer
*/
public long getJournalBufferedWritesThreshold() {
return getLong(JOURNAL_BUFFERED_WRITES_THRESHOLD, 512 * 1024);
}
+ /**
+ * Maximum entries to buffer to impose on a journal write to achieve grouping.
+ * Use {@link #getJournalBufferedWritesThreshold()} if this is set to zero or
+ * less than zero.
+ *
+ * @return max entries to buffer.
+ */
+ public long getJournalBufferedEntriesThreshold() {
+ return getLong(JOURNAL_BUFFERED_ENTRIES_THRESHOLD, 0);
+ }
+
+ /**
+ * Set maximum entries to buffer to impose on a journal write to achieve grouping.
+ * Use {@link #getJournalBufferedWritesThreshold()} set this to zero or less than
+ * zero.
+ *
+ * @param maxEntries
+ * maximum entries to buffer.
+ * @return server configuration.
+ */
+ public ServerConfiguration setJournalBufferedEntriesThreshold(int maxEntries) {
+ setProperty(JOURNAL_BUFFERED_ENTRIES_THRESHOLD, maxEntries);
+ return this;
+ }
/**
* Set if we should flush the journal when queue is empty