You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/20 17:35:24 UTC

[2/7] bookkeeper git commit: fix merge conflicts

fix merge conflicts


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/014512c3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/014512c3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/014512c3

Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 014512c3563bd06bd90789db0c5d0369fe421a62
Parents: d980296
Author: Sijie Guo <si...@apache.org>
Authored: Thu Nov 17 17:32:15 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 17:32:15 2016 -0800

----------------------------------------------------------------------
 .../bookie/BookKeeperServerStats.java           |  1 +
 .../bookkeeper/bookie/BufferedChannel.java      |  8 +++--
 .../org/apache/bookkeeper/bookie/Journal.java   | 34 +++++++++-----------
 3 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 239f923..79c0d61 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -53,6 +53,7 @@ public interface BookKeeperServerStats {
     public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
     public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
     public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
+    public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD";
     public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
     public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
     public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index cb7d914..0492943 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -24,9 +24,10 @@ package org.apache.bookkeeper.bookie;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.bookkeeper.util.ZeroBuffer;
+
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
@@ -64,8 +65,9 @@ public class BufferedChannel extends BufferedReadChannel {
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    synchronized public void write(ByteBuffer src) throws IOException {
+    synchronized public int write(ByteBuffer src) throws IOException {
         int copied = 0;
+        int flushes = 0;
         while(src.remaining() > 0) {
             int truncated = 0;
             if (writeBuffer.remaining() < src.remaining()) {
@@ -78,9 +80,11 @@ public class BufferedChannel extends BufferedReadChannel {
             // if we have run out of buffer space, we should flush to the file
             if (writeBuffer.remaining() == 0) {
                 flushInternal();
+                ++flushes;
             }
         }
         position += copied;
+        return flushes;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd62d28..e8adfff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -30,8 +30,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -39,18 +37,15 @@ import com.google.common.base.Stopwatch;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-<<<<<<< HEAD
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-=======
-import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger;
-import org.apache.bookkeeper.stats.ServerStatsProvider;
->>>>>>> 2d5718f... bookie: fallocate & sync_file_range
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.ZeroBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -295,7 +290,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         }
 
         @Override
-        public void run() {
+        public void safeRun() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
@@ -541,10 +536,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     private final LedgerDirsManager ledgerDirsManager;
 
     // Expose Stats
+    private final StatsLogger statsLogger;
     private final OpStatsLogger journalAddEntryStats;
-    private final OpStatsLogger journalSyncStats;
+    private final OpStatsLogger journalMemAddEntryStats;
     private final OpStatsLogger journalCreationStats;
     private final OpStatsLogger journalFlushStats;
+    private final OpStatsLogger journalMemAddFlushStats;
     private final OpStatsLogger journalProcessTimeStats;
     private final OpStatsLogger journalQueueStats;
     private final OpStatsLogger forceWriteGroupingCountStats;
@@ -580,7 +577,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         this.cbThreadPool = OrderedSafeExecutor.newBuilder()
                 .name("BookieJournal")
                 .numThreads(conf.getNumJournalCallbackThreads())
-                .statsLogger(Stats.get().getStatsLogger("journal"))
+                .statsLogger(statsLogger)
                 .threadFactory(new DaemonThreadFactory())
                 .build();
 
@@ -594,10 +591,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
 
         // Expose Stats
+        this.statsLogger = statsLogger;
         journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
-        journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC);
+        journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY);
         journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
         journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+        journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD);
         journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY);
         journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY);
         forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
@@ -928,8 +927,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                             if (!enableGroupForceWrites) {
                                 logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
                             }
-                            journalFlushLatencyStats.registerSuccessfulEvent(
-                                    journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS));
+                            journalFlushStats.registerSuccessfulEvent(
+                                    journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
 
                             // Trace the lifetime of entries through persistence
                             if (LOG.isDebugEnabled()) {
@@ -983,9 +982,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                 flushes += bc.write(lenBuff);
                 flushes += bc.write(qe.entry);
 
-                journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes);
-                journalMemAddLatencyStats.registerSuccessfulEvent(
-                        MathUtils.elapsedMicroSec(qe.enqueueTime));
+                journalMemAddFlushStats.registerSuccessfulValue(flushes);
+                journalMemAddEntryStats.registerSuccessfulEvent(
+                        MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS);
 
                 toFlush.add(qe);
                 qe = null;
@@ -1018,8 +1017,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
             LOG.info("Shutting down Journal");
             forceWriteThread.shutdown();
             cbThreadPool.shutdown();
-            ;
-            if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) {
+            if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
                 LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
             }
             running = false;