You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/20 17:35:24 UTC
[2/7] bookkeeper git commit: fix merge conflicts
fix merge conflicts
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/014512c3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/014512c3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/014512c3
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 014512c3563bd06bd90789db0c5d0369fe421a62
Parents: d980296
Author: Sijie Guo <si...@apache.org>
Authored: Thu Nov 17 17:32:15 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 17:32:15 2016 -0800
----------------------------------------------------------------------
.../bookie/BookKeeperServerStats.java | 1 +
.../bookkeeper/bookie/BufferedChannel.java | 8 +++--
.../org/apache/bookkeeper/bookie/Journal.java | 34 +++++++++-----------
3 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 239f923..79c0d61 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -53,6 +53,7 @@ public interface BookKeeperServerStats {
public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
+ public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD";
public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index cb7d914..0492943 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -24,9 +24,10 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.bookkeeper.util.ZeroBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.util.ZeroBuffer;
+
/**
* Provides a buffering layer in front of a FileChannel.
*/
@@ -64,8 +65,9 @@ public class BufferedChannel extends BufferedReadChannel {
* @param src The source ByteBuffer which contains the data to be written.
* @throws IOException if a write operation fails.
*/
- synchronized public void write(ByteBuffer src) throws IOException {
+ synchronized public int write(ByteBuffer src) throws IOException {
int copied = 0;
+ int flushes = 0;
while(src.remaining() > 0) {
int truncated = 0;
if (writeBuffer.remaining() < src.remaining()) {
@@ -78,9 +80,11 @@ public class BufferedChannel extends BufferedReadChannel {
// if we have run out of buffer space, we should flush to the file
if (writeBuffer.remaining() == 0) {
flushInternal();
+ ++flushes;
}
}
position += copied;
+ return flushes;
}
/**
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd62d28..e8adfff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -30,8 +30,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -39,18 +37,15 @@ import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-<<<<<<< HEAD
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-=======
-import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger;
-import org.apache.bookkeeper.stats.ServerStatsProvider;
->>>>>>> 2d5718f... bookie: fallocate & sync_file_range
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,7 +290,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
}
@Override
- public void run() {
+ public void safeRun() {
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
}
@@ -541,10 +536,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
private final LedgerDirsManager ledgerDirsManager;
// Expose Stats
+ private final StatsLogger statsLogger;
private final OpStatsLogger journalAddEntryStats;
- private final OpStatsLogger journalSyncStats;
+ private final OpStatsLogger journalMemAddEntryStats;
private final OpStatsLogger journalCreationStats;
private final OpStatsLogger journalFlushStats;
+ private final OpStatsLogger journalMemAddFlushStats;
private final OpStatsLogger journalProcessTimeStats;
private final OpStatsLogger journalQueueStats;
private final OpStatsLogger forceWriteGroupingCountStats;
@@ -580,7 +577,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
this.cbThreadPool = OrderedSafeExecutor.newBuilder()
.name("BookieJournal")
.numThreads(conf.getNumJournalCallbackThreads())
- .statsLogger(Stats.get().getStatsLogger("journal"))
+ .statsLogger(statsLogger)
.threadFactory(new DaemonThreadFactory())
.build();
@@ -594,10 +591,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
// Expose Stats
+ this.statsLogger = statsLogger;
journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
- journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC);
+ journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY);
journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+ journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD);
journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY);
journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY);
forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
@@ -928,8 +927,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
if (!enableGroupForceWrites) {
logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
}
- journalFlushLatencyStats.registerSuccessfulEvent(
- journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS));
+ journalFlushStats.registerSuccessfulEvent(
+ journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
@@ -983,9 +982,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
flushes += bc.write(lenBuff);
flushes += bc.write(qe.entry);
- journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes);
- journalMemAddLatencyStats.registerSuccessfulEvent(
- MathUtils.elapsedMicroSec(qe.enqueueTime));
+ journalMemAddFlushStats.registerSuccessfulValue(flushes);
+ journalMemAddEntryStats.registerSuccessfulEvent(
+ MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS);
toFlush.add(qe);
qe = null;
@@ -1018,8 +1017,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.info("Shutting down Journal");
forceWriteThread.shutdown();
cbThreadPool.shutdown();
- ;
- if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) {
+ if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
}
running = false;