You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/08/02 20:02:07 UTC

svn commit: r1615338 [1/2] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache...

Author: fpj
Date: Sat Aug  2 18:02:06 2014
New Revision: 1615338

URL: http://svn.apache.org/r1615338
Log:
BOOKEEPER-697. stats collection on bookkeeper server (sijie via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Aug  2 18:02:06 2014
@@ -298,6 +298,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-739: Test timeouts mostly ignored (sijie via fpj)
 
+      BOOKKEEPER-697: stats collection on bookkeeper server (sijie via fpj)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Sat Aug  2 18:02:06 2014
@@ -272,3 +272,6 @@ zkTimeout=10000
 # When false, bookie will use its ipaddress for the registration.
 # Defaults to false.
 #useHostNameAsBookieID=false
+
+# Stats Provider Class
+#statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsProvider

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat Aug  2 18:02:06 2014
@@ -50,6 +50,10 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
@@ -71,11 +75,16 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
+
 /**
  * Implements a bookie.
  *
  */
-
 public class Bookie extends BookieCriticalThread {
 
     private final static Logger LOG = LoggerFactory.getLogger(Bookie.class);
@@ -121,6 +130,10 @@ public class Bookie extends BookieCritic
 
     final private AtomicBoolean readOnly = new AtomicBoolean(false);
 
+    // Expose Stats
+    private final Counter writeBytes;
+    private final Counter readBytes;
+
     public static class NoLedgerException extends IOException {
         private static final long serialVersionUID = 1L;
         private final long ledgerId;
@@ -290,7 +303,7 @@ public class Bookie extends BookieCritic
             boolean newEnv = false;
             List<File> missedCookieDirs = new ArrayList<File>();
             Cookie journalCookie = null;
-            // try to read cookie from journal directory. 
+            // try to read cookie from journal directory.
             try {
                 journalCookie = Cookie.readFromDirectory(journalDirectory);
                 if (journalCookie.isBookieHostCreatedFromIp()) {
@@ -312,7 +325,7 @@ public class Bookie extends BookieCritic
                 masterCookie.verify(zkCookie);
             } catch (KeeperException.NoNodeException nne) {
                 // can occur in cases:
-                // 1) new environment or 
+                // 1) new environment or
                 // 2) done only metadata format and started bookie server.
             }
             checkDirectoryStructure(journalDirectory);
@@ -424,19 +437,25 @@ public class Bookie extends BookieCritic
         return currentDirs;
     }
 
-
     public Bookie(ServerConfiguration conf)
             throws IOException, KeeperException, InterruptedException, BookieException {
+        this(conf, NullStatsLogger.INSTANCE);
+    }
+
+    public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
+            throws IOException, KeeperException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
         this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
         this.conf = conf;
         this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
-        this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                statsLogger.scope(LD_LEDGER_SCOPE));
         File[] idxDirs = conf.getIndexDirs();
         if (null == idxDirs) {
             this.indexDirsManager = this.ledgerDirsManager;
         } else {
-            this.indexDirsManager = new LedgerDirsManager(conf, idxDirs);
+            this.indexDirsManager = new LedgerDirsManager(conf, idxDirs,
+                    statsLogger.scope(LD_INDEX_SCOPE));
         }
 
         // instantiate zookeeper client to initialize ledger manager
@@ -456,11 +475,11 @@ public class Bookie extends BookieCritic
         if (conf.getSortedLedgerStorageEnabled()) {
             ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
                                                     ledgerDirsManager, indexDirsManager,
-                                                    journal);
+                                                    journal, statsLogger);
         } else {
             ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
                                                          ledgerDirsManager, indexDirsManager,
-                                                         journal);
+                                                         journal, statsLogger);
         }
         syncThread = new SyncThread(conf, getLedgerDirsListener(),
                                     ledgerStorage, journal);
@@ -471,6 +490,22 @@ public class Bookie extends BookieCritic
         String myID = getMyId();
         zkBookieRegPath = this.bookieRegistrationPath + myID;
         zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
+
+        // Expose Stats
+        writeBytes = statsLogger.getCounter(WRITE_BYTES);
+        readBytes = statsLogger.getCounter(READ_BYTES);
+        // 1 : up, 0 : readonly
+        statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readOnly.get() ? 0 : 1;
+            }
+        });
     }
 
     private String getMyId() throws UnknownHostException {
@@ -1029,13 +1064,6 @@ public class Bookie extends BookieCritic
         return l;
     }
 
-    protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
-        throws IOException, BookieException {
-        byte[] key = ledgerStorage.readMasterKey(ledgerId);
-        LedgerDescriptor handle = handles.getHandle(ledgerId, key);
-        handle.addEntry(entry);
-    }
-
     /**
      * Add an entry to a ledger as specified by handle.
      */
@@ -1046,6 +1074,8 @@ public class Bookie extends BookieCritic
         long entryId = handle.addEntry(entry);
 
         entry.rewind();
+        writeBytes.add(entry.remaining());
+
         LOG.trace("Adding {}@{}", entryId, ledgerId);
         journal.logAddEntry(entry, cb, ctx);
     }
@@ -1124,7 +1154,9 @@ public class Bookie extends BookieCritic
             throws IOException, NoLedgerException {
         LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
         LOG.trace("Reading {}@{}", entryId, ledgerId);
-        return handle.readEntry(entryId);
+        ByteBuffer entry = handle.readEntry(entryId);
+        readBytes.add(entry.remaining());
+        return entry;
     }
 
     // The rest of the code is test stuff

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java Sat Aug  2 18:02:06 2014
@@ -27,12 +27,22 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_FLUSH_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_GET_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
+
 /**
  * The EntryMemTable holds in-memory representation to the entries not-yet flushed.
  * When asked to flush, current EntrySkipList is moved to snapshot and is cleared.
@@ -101,11 +111,19 @@ public class EntryMemTable {
         return new EntrySkipList(checkpointSource.newCheckpoint());
     }
 
+    // Stats
+    private final OpStatsLogger snapshotStats;
+    private final OpStatsLogger putEntryStats;
+    private final OpStatsLogger getEntryStats;
+    private final Counter flushBytesCounter;
+    private final Counter throttlingCounter;
+
     /**
     * Constructor.
     * @param conf Server configuration
     */
-    public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source) {
+    public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source,
+                         final StatsLogger statsLogger) {
         this.checkpointSource = source;
         this.kvmap = newSkipList();
         this.snapshot = EntrySkipList.EMPTY_VALUE;
@@ -114,6 +132,13 @@ public class EntryMemTable {
         this.allocator = new SkipListArena(conf);
         // skip list size limit
         this.skipListSizeLimit = conf.getSkipListSizeLimit();
+
+        // Stats
+        this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
+        this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
+        this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
+        this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
+        this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
     }
 
     void dump() {
@@ -143,6 +168,7 @@ public class EntryMemTable {
         // No-op if snapshot currently has entries
         if (this.snapshot.isEmpty() &&
                 this.kvmap.compareTo(oldCp) < 0) {
+            final long startTimeNanos = MathUtils.nowInNano();
             this.lock.writeLock().lock();
             try {
                 if (this.snapshot.isEmpty() && !this.kvmap.isEmpty()
@@ -159,6 +185,12 @@ public class EntryMemTable {
             } finally {
                 this.lock.writeLock().unlock();
             }
+
+            if (null != cp) {
+                snapshotStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                snapshotStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
         }
         return cp;
     }
@@ -207,6 +239,7 @@ public class EntryMemTable {
                             }
                         }
                     }
+                    flushBytesCounter.add(size);
                     clearSnapshot(keyValues);
                 }
             }
@@ -242,6 +275,7 @@ public class EntryMemTable {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
+        throttlingCounter.inc();
     }
 
     /**
@@ -252,24 +286,34 @@ public class EntryMemTable {
     public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback cb)
             throws IOException {
         long size = 0;
-        if (isSizeLimitReached()) {
-            Checkpoint cp = snapshot();
-            if (null != cp) {
-                cb.onSizeLimitReached();
-            } else {
-                throttleWriters();
+        long startTimeNanos = MathUtils.nowInNano();
+        boolean success = false;
+        try {
+            if (isSizeLimitReached()) {
+                Checkpoint cp = snapshot();
+                if (null != cp) {
+                    cb.onSizeLimitReached();
+                } else {
+                    throttleWriters();
+                }
             }
-        }
 
-        this.lock.readLock().lock();
-        try {
-            EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
-            size = internalAdd(toAdd);
+            this.lock.readLock().lock();
+            try {
+                EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
+                size = internalAdd(toAdd);
+            } finally {
+                this.lock.readLock().unlock();
+            }
+            success = true;
+            return size;
         } finally {
-            this.lock.readLock().unlock();
+            if (success) {
+                putEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                putEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
         }
-
-        return size;
     }
 
     /**
@@ -326,14 +370,22 @@ public class EntryMemTable {
     public EntryKeyValue getEntry(long ledgerId, long entryId) throws IOException {
         EntryKey key = new EntryKey(ledgerId, entryId);
         EntryKeyValue value = null;
+        long startTimeNanos = MathUtils.nowInNano();
+        boolean success = false;
         this.lock.readLock().lock();
         try {
             value = this.kvmap.get(key);
             if (value == null) {
                 value = this.snapshot.get(key);
             }
+            success = true;
         } finally {
             this.lock.readLock().unlock();
+            if (success) {
+                getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
         }
 
         return value;
@@ -347,14 +399,22 @@ public class EntryMemTable {
     public EntryKeyValue getLastEntry(long ledgerId) throws IOException {
         EntryKey result = null;
         EntryKey key = new EntryKey(ledgerId, Long.MAX_VALUE);
+        long startTimeNanos = MathUtils.nowInNano();
+        boolean success = false;
         this.lock.readLock().lock();
         try {
             result = this.kvmap.floorKey(key);
             if (result == null || result.getLedgerId() != ledgerId) {
                 result = this.snapshot.floorKey(key);
             }
+            success = true;
         } finally {
             this.lock.readLock().unlock();
+            if (success) {
+                getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
         }
 
         if (result == null || result.getLedgerId() != ledgerId) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java Sat Aug  2 18:02:06 2014
@@ -21,6 +21,8 @@
 package org.apache.bookkeeper.bookie;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,8 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_INDEX_PAGES;
+
 class IndexInMemPageMgr {
     private final static Logger LOG = LoggerFactory.getLogger(IndexInMemPageMgr.class);
     private final static ConcurrentHashMap<Long, LedgerEntryPage> EMPTY_PAGE_MAP
@@ -321,7 +325,8 @@ class IndexInMemPageMgr {
     public IndexInMemPageMgr(int pageSize,
                              int entriesPerPage,
                              ServerConfiguration conf,
-                             IndexPersistenceMgr indexPersistenceManager) {
+                             IndexPersistenceMgr indexPersistenceManager,
+                             StatsLogger statsLogger) {
         this.pageSize = pageSize;
         this.entriesPerPage = entriesPerPage;
         this.indexPersistenceManager = indexPersistenceManager;
@@ -335,6 +340,18 @@ class IndexInMemPageMgr {
         }
         LOG.info("maxMemory = {}, pageSize = {}, pageLimit = {}", new Object[] { Runtime.getRuntime().maxMemory(),
                         pageSize, pageLimit });
+        // Expose Stats
+        statsLogger.registerGauge(NUM_INDEX_PAGES, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return getNumUsedPages();
+            }
+        });
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Sat Aug  2 18:02:06 2014
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.bookie;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
@@ -35,12 +34,18 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
+
 public class IndexPersistenceMgr {
     private final static Logger LOG = LoggerFactory.getLogger(IndexPersistenceMgr.class);
 
@@ -72,11 +77,15 @@ public class IndexPersistenceMgr {
     private LedgerDirsManager ledgerDirsManager;
     final LinkedList<Long> openLedgers = new LinkedList<Long>();
 
+    // Stats
+    private final Counter evictedLedgersCounter;
+
     public IndexPersistenceMgr(int pageSize,
                                int entriesPerPage,
                                ServerConfiguration conf,
                                SnapshotMap<Long, Boolean> activeLedgers,
-                               LedgerDirsManager ledgerDirsManager) throws IOException {
+                               LedgerDirsManager ledgerDirsManager,
+                               StatsLogger statsLogger) throws IOException {
         this.openFileLimit = conf.getOpenFileLimit();
         this.activeLedgers = activeLedgers;
         this.ledgerDirsManager = ledgerDirsManager;
@@ -86,6 +95,20 @@ public class IndexPersistenceMgr {
         // Retrieve all of the active ledgers.
         getActiveLedgers();
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+
+        // Expose Stats
+        evictedLedgersCounter = statsLogger.getCounter(LEDGER_CACHE_NUM_EVICTED_LEDGERS);
+        statsLogger.registerGauge(NUM_OPEN_LEDGERS, new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return getNumOpenLedgers();
+            }
+        });
     }
 
     FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
@@ -270,8 +293,7 @@ public class IndexPersistenceMgr {
                 // was executing.
                 return;
             }
-            // TODO Add a statistic here, we don't care really which
-            // ledger is evicted, but the rate at which they get evicted
+            evictedLedgersCounter.inc();
             FileInfo fi = fileInfoCache.remove(ledgerToRemove);
             if (null == fi) {
                 // Seems like someone else already closed the file.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Sat Aug  2 18:02:06 2014
@@ -27,16 +27,22 @@ import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
-import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
+
 /**
  * Interleave ledger storage
  * This ledger storage implementation stores all entries in a single
@@ -82,24 +88,31 @@ class InterleavedLedgerStorage implement
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
+    // Expose Stats
+    private final OpStatsLogger getOffsetStats;
+    private final OpStatsLogger getEntryStats;
+
     InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                              LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource)
             throws IOException {
-        this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource);
+        this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource, NullStatsLogger.INSTANCE);
     }
 
     InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                              LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                             CheckpointSource checkpointSource)
+                             CheckpointSource checkpointSource, StatsLogger statsLogger)
             throws IOException {
         activeLedgers = new SnapshotMap<Long, Boolean>();
         this.checkpointSource = checkpointSource;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
-                null == indexDirsManager ? ledgerDirsManager : indexDirsManager);
+                null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
                 activeLedgers, ledgerManager);
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        // Expose Stats
+        getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
+        getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
     }
 
     private LedgerDirsListener getLedgerDirsListener() {
@@ -208,11 +221,36 @@ class InterleavedLedgerStorage implement
             entryId = ledgerCache.getLastEntry(ledgerId);
         }
 
-        offset = ledgerCache.getEntryOffset(ledgerId, entryId);
-        if (offset == 0) {
-            throw new Bookie.NoEntryException(ledgerId, entryId);
+        // Get Offset
+        long startTimeNanos = MathUtils.nowInNano();
+        boolean success = false;
+        try {
+            offset = ledgerCache.getEntryOffset(ledgerId, entryId);
+            if (offset == 0) {
+                throw new Bookie.NoEntryException(ledgerId, entryId);
+            }
+            success = true;
+        } finally {
+            if (success) {
+                getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                getOffsetStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
+        }
+        // Get Entry
+        startTimeNanos = MathUtils.nowInNano();
+        success = false;
+        try {
+            byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
+            success = true;
+            return ByteBuffer.wrap(retBytes);
+        } finally {
+            if (success) {
+                getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            } else {
+                getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            }
         }
-        return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
     }
 
     private void flushOrCheckpoint(boolean isCheckpointFlush)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Aug  2 18:02:06 2014
@@ -35,9 +35,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
@@ -45,6 +50,8 @@ import org.apache.bookkeeper.util.ZeroBu
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
+
 /**
  * Provide journal related management.
  */
@@ -287,7 +294,7 @@ class Journal extends BookieCriticalThre
         }
     }
 
-    private class ForceWriteRequest extends BookieCriticalThread {
+    private class ForceWriteRequest implements Runnable {
         private final JournalChannel logFile;
         private final LinkedList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
@@ -301,16 +308,17 @@ class Journal extends BookieCriticalThre
                           LinkedList<QueueEntry> forceWriteWaiters,
                           boolean shouldClose,
                           boolean isMarker) {
-            super("ForceWriteRequestThread");
             this.forceWriteWaiters = forceWriteWaiters;
             this.logFile = logFile;
             this.logId = logId;
             this.lastFlushedPosition = lastFlushedPosition;
             this.shouldClose = shouldClose;
             this.isMarker = isMarker;
+            forceWriteQueueSize.inc();
         }
 
         public int process(boolean shouldForceWrite) throws IOException {
+            forceWriteQueueSize.dec();
             if (isMarker) {
                 return 0;
             }
@@ -334,6 +342,7 @@ class Journal extends BookieCriticalThre
         @Override
         public void run() {
             for (QueueEntry e : this.forceWriteWaiters) {
+                journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(e.enqueueTime));
                 e.callback();    // Process cbs inline
             }
         }
@@ -397,6 +406,7 @@ class Journal extends BookieCriticalThre
                             // the last force write and then reset the counter so we can accumulate
                             // requests in the write we are about to issue
                             if (numReqInLastForceWrite > 0) {
+                                forceWriteGroupingCountStats.registerSuccessfulEvent(numReqInLastForceWrite);
                                 numReqInLastForceWrite = 0;
                             }
                         }
@@ -506,7 +516,25 @@ class Journal extends BookieCriticalThre
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
 
+    // Expose Stats
+    private final OpStatsLogger journalAddEntryStats;
+    private final OpStatsLogger journalCreationStats;
+    private final OpStatsLogger journalFlushStats;
+    private final OpStatsLogger forceWriteGroupingCountStats;
+    private final OpStatsLogger forceWriteBatchEntriesStats;
+    private final OpStatsLogger forceWriteBatchBytesStats;
+    private final Counter journalQueueSize;
+    private final Counter forceWriteQueueSize;
+    private final Counter flushMaxWaitCounter;
+    private final Counter flushMaxOutstandingBytesCounter;
+    private final Counter flushEmptyQueueCounter;
+    private final Counter journalWriteBytes;
+
     public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
+        this(conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+    }
+
+    public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
         super("BookieJournal-" + conf.getBookiePort());
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
@@ -530,6 +558,20 @@ class Journal extends BookieCriticalThre
         // read last log mark
         lastLogMark.readLog();
         LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
+
+        // Expose Stats
+        journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
+        journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
+        journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+        forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
+        forceWriteBatchEntriesStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
+        forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_BATCH_BYTES);
+        journalQueueSize = statsLogger.getCounter(JOURNAL_QUEUE_SIZE);
+        forceWriteQueueSize = statsLogger.getCounter(JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+        flushMaxWaitCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_MAX_WAIT);
+        flushMaxOutstandingBytesCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
+        flushEmptyQueueCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_EMPTY_QUEUE);
+        journalWriteBytes = statsLogger.getCounter(JOURNAL_WRITE_BYTES);
     }
 
     LastLogMark getLastLogMark() {
@@ -703,6 +745,7 @@ class Journal extends BookieCriticalThre
         long ledgerId = entry.getLong();
         long entryId = entry.getLong();
         entry.rewind();
+        journalQueueSize.inc();
         queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
     }
 
@@ -738,6 +781,9 @@ class Journal extends BookieCriticalThre
         ZeroBuffer.put(paddingBuff);
         JournalChannel logFile = null;
         forceWriteThread.start();
+        Stopwatch journalCreationWatcher = new Stopwatch();
+        Stopwatch journalFlushWatcher = new Stopwatch();
+        long batchSize = 0;
         try {
             List<Long> journalIds = listJournalIds(journalDirectory, null);
             // Should not use MathUtils.now(), which use System.nanoTime() and
@@ -753,6 +799,8 @@ class Journal extends BookieCriticalThre
                 // new journal file to write
                 if (null == logFile) {
                     logId = logId + 1;
+
+                    journalCreationWatcher.reset().start();
                     logFile = new JournalChannel(journalDirectory,
                                         logId,
                                         journalPreAllocSize,
@@ -760,6 +808,9 @@ class Journal extends BookieCriticalThre
                                         conf.getJournalAlignmentSize(),
                                         removePagesFromCache,
                                         conf.getJournalFormatVersionToWrite());
+                    journalCreationStats.registerSuccessfulEvent(
+                            journalCreationWatcher.stop().elapsedTime(TimeUnit.MILLISECONDS));
+
                     bc = logFile.getBufferedChannel();
 
                     lastFlushPosition = bc.position();
@@ -787,17 +838,20 @@ class Journal extends BookieCriticalThre
                             // b) limit the number of entries to group
                             groupWhenTimeout = false;
                             shouldFlush = true;
+                            flushMaxWaitCounter.inc();
                         } else if (qe != null &&
                                 ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) ||
                                  (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
                             // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
                             shouldFlush = true;
+                            flushMaxOutstandingBytesCounter.inc();
                         } else if (qe == null) {
                             // We should get here only if we flushWhenQueueEmpty is true else we would wait
                             // for timeout that would put is past the maxWait threshold
                             // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
                             // publish at a time - common case in tests.
                             shouldFlush = true;
+                            flushEmptyQueueCounter.inc();
                         }
 
                         // toFlush is non null and not empty so should be safe to access getFirst
@@ -805,8 +859,11 @@ class Journal extends BookieCriticalThre
                             if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) {
                                 writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize());
                             }
+                            journalFlushWatcher.reset().start();
                             bc.flush(false);
                             lastFlushPosition = bc.position();
+                            journalFlushStats.registerSuccessfulEvent(
+                                    journalFlushWatcher.stop().elapsedTime(TimeUnit.MILLISECONDS));
 
                             // Trace the lifetime of entries through persistence
                             if (LOG.isDebugEnabled()) {
@@ -815,8 +872,12 @@ class Journal extends BookieCriticalThre
                                 }
                             }
 
+                            forceWriteBatchEntriesStats.registerSuccessfulEvent(toFlush.size());
+                            forceWriteBatchBytesStats.registerSuccessfulEvent(batchSize);
+
                             forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
                             toFlush = new LinkedList<QueueEntry>();
+                            batchSize = 0L;
                             // check whether journal file is over file limit
                             if (bc.position() > maxJournalSize) {
                                 logFile = null;
@@ -835,6 +896,11 @@ class Journal extends BookieCriticalThre
                     continue;
                 }
 
+                journalWriteBytes.add(qe.entry.remaining());
+                journalQueueSize.dec();
+
+                batchSize += (4 + qe.entry.remaining());
+
                 lenBuff.clear();
                 lenBuff.putInt(qe.entry.remaining());
                 lenBuff.flip();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Sat Aug  2 18:02:06 2014
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
 import java.io.IOException;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +43,18 @@ public class LedgerCacheImpl implements 
     private final int entriesPerPage;
 
     public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
-                    LedgerDirsManager ledgerDirsManager) throws IOException {
+                           LedgerDirsManager ledgerDirsManager) throws IOException {
+        this(conf, activeLedgers, ledgerDirsManager, NullStatsLogger.INSTANCE);
+    }
+
+    public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
+                           LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) throws IOException {
         this.pageSize = conf.getPageSize();
         this.entriesPerPage = pageSize / 8;
         this.indexPersistenceManager = new IndexPersistenceMgr(pageSize, entriesPerPage, conf, activeLedgers,
-                        ledgerDirsManager);
-        this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf, indexPersistenceManager);
+                ledgerDirsManager, statsLogger);
+        this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf,
+                indexPersistenceManager, statsLogger);
     }
 
     IndexPersistenceMgr getIndexPersistenceManager() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Sat Aug  2 18:02:06 2014
@@ -26,8 +26,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
 import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
@@ -37,6 +42,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_WRITABLE_DIRS;
+
 /**
  * This class manages ledger directories used by the bookie.
  */
@@ -51,15 +58,48 @@ public class LedgerDirsManager {
     private final List<LedgerDirsListener> listeners;
     private final LedgerDirsMonitor monitor;
     private final Random rand = new Random();
+    private final ConcurrentMap<File, Float> diskUsages =
+            new ConcurrentHashMap<File, Float>();
 
     public LedgerDirsManager(ServerConfiguration conf, File[] dirs) {
+        this(conf, dirs, NullStatsLogger.INSTANCE);
+    }
+
+    LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
         this.filledDirs = new ArrayList<File>();
-        listeners = new ArrayList<LedgerDirsManager.LedgerDirsListener>();
+        listeners = new ArrayList<LedgerDirsListener>();
         diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
         monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval());
+        for (File dir : dirs) {
+            diskUsages.put(dir, 0f);
+            String statName = "dir_" + dir.getPath().replace('/', '_') + "_usage";
+            final File targetDir = dir;
+            statsLogger.registerGauge(statName, new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return diskUsages.get(targetDir) * 100;
+                }
+            });
+        }
+        statsLogger.registerGauge(LD_WRITABLE_DIRS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return writableLedgerDirectories.size();
+            }
+        });
     }
 
     /**
@@ -192,7 +232,7 @@ public class LedgerDirsManager {
 
     /**
      * Sweep through all the directories to check disk errors or disk full.
-     * 
+     *
      * @throws DiskErrorException
      *             If disk having errors
      * @throws NoWritableLedgerDirException
@@ -246,18 +286,22 @@ public class LedgerDirsManager {
                 // Check all writable dirs disk space usage.
                 for (File dir : writableDirs) {
                     try {
-                        diskChecker.checkDir(dir);
+                        diskUsages.put(dir, diskChecker.checkDir(dir));
                     } catch (DiskErrorException e) {
+                        LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
                         // Notify disk failure to all listeners
                         for (LedgerDirsListener listener : listeners) {
-                            LOG.warn("{} has errors.", dir, e);
                             listener.diskFailed(dir);
                         }
                     } catch (DiskWarnThresholdException e) {
+                        LOG.warn("Ledger directory {} is almost full.", dir);
+                        diskUsages.put(dir, e.getUsage());
                         for (LedgerDirsListener listener : listeners) {
                             listener.diskAlmostFull(dir);
                         }
                     } catch (DiskOutOfSpaceException e) {
+                        LOG.error("Ledger directory {} is out-of-space.", dir);
+                        diskUsages.put(dir, e.getUsage());
                         // Notify disk full to all listeners
                         addToFilledDirs(dir);
                     }
@@ -266,7 +310,7 @@ public class LedgerDirsManager {
                 // Check all full-filled disk space usage
                 for (File dir : fullfilledDirs) {
                     try {
-                        diskChecker.checkDir(dir);
+                        diskUsages.put(dir, diskChecker.checkDir(dir));
                         addToWritableDirs(dir, true);
                     } catch (DiskErrorException e) {
                         //Notify disk failure to all the listeners
@@ -274,10 +318,12 @@ public class LedgerDirsManager {
                             listener.diskFailed(dir);
                         }
                     } catch (DiskWarnThresholdException e) {
+                        diskUsages.put(dir, e.getUsage());
                         // the full-filled dir become writable but still above warn threshold
                         addToWritableDirs(dir, false);
                     } catch (DiskOutOfSpaceException e) {
                         // the full-filled dir is still full-filled
+                        diskUsages.put(dir, e.getUsage());
                     }
                 }
                 try {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java Sat Aug  2 18:02:06 2014
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.bookie.Chec
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +44,10 @@ public class SortedLedgerStorage extends
 
     public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                               final CheckpointSource checkpointSource)
+                               final CheckpointSource checkpointSource, StatsLogger statsLogger)
                                        throws IOException {
-        super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null);
-        this.memTable = new EntryMemTable(conf, checkpointSource);
+        super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null, statsLogger);
+        this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
         this.scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder()
                 .setNameFormat("SortedLedgerStorage-%d")

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Aug  2 18:02:06 2014
@@ -21,6 +21,9 @@ import java.io.File;
 import java.util.List;
 
 import com.google.common.annotations.Beta;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
 
@@ -76,8 +79,6 @@ public class ServerConfiguration extends
     // Zookeeper Parameters
     protected final static String ZK_TIMEOUT = "zkTimeout";
     protected final static String ZK_SERVERS = "zkServers";
-    // Statistics Parameters
-    protected final static String ENABLE_STATISTICS = "enableStatistics";
     protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
     //ReadOnly mode support on all disk full
     protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
@@ -104,6 +105,10 @@ public class ServerConfiguration extends
     protected final static String SKIP_LIST_CHUNK_SIZE_ENTRY = "skipListArenaChunkSize";
     protected final static String SKIP_LIST_MAX_ALLOC_ENTRY = "skipListArenaMaxAllocSize";
 
+    // Statistics Parameters
+    protected final static String ENABLE_STATISTICS = "enableStatistics";
+    protected final static String STATS_PROVIDER_CLASS = "statsProviderClass";
+
     /**
      * Construct a default configuration object
      */
@@ -1287,25 +1292,9 @@ public class ServerConfiguration extends
     }
 
     /**
-     * Validate the configuration.
-     * @throws ConfigurationException
-     */
-    public void validate() throws ConfigurationException {
-        if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
-            throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
-        }
-        if (getJournalAlignmentSize() < 512 || getJournalAlignmentSize() % 512 != 0) {
-            throw new ConfigurationException("Invalid journal alignment size : " + getJournalAlignmentSize());
-        }
-        if (getJournalAlignmentSize() > getJournalPreAllocSizeMB() * 1024 * 1024) {
-            throw new ConfigurationException("Invalid preallocation size : " + getJournalPreAllocSizeMB() + " MB");
-        }
-    }
-
-    /**
      * Get whether bookie is using hostname for registration and in ledger
      * metadata. Defaults to false.
-     * 
+     *
      * @return true, then bookie will be registered with its hostname and
      *         hostname will be used in ledger metadata. Otherwise bookie will
      *         use its ipaddress
@@ -1317,12 +1306,56 @@ public class ServerConfiguration extends
     /**
      * Configure the bookie to use its hostname to register with the
      * co-ordination service(eg: zookeeper) and in ledger metadata
-     * 
+     *
      * @see #getUseHostNameAsBookieID
      * @param useHostName
      *            whether to use hostname for registration and in ledgermetadata
+     * @return server configuration
      */
-    public void setUseHostNameAsBookieID(boolean useHostName) {
+    public ServerConfiguration setUseHostNameAsBookieID(boolean useHostName) {
         setProperty(USE_HOST_NAME_AS_BOOKIE_ID, useHostName);
+        return this;
     }
+
+    /**
+     * Get the stats provider used by bookie.
+     *
+     * @return stats provider class
+     * @throws ConfigurationException
+     */
+    public Class<? extends StatsProvider> getStatsProviderClass()
+        throws ConfigurationException {
+        return ReflectionUtils.getClass(this, STATS_PROVIDER_CLASS,
+                                        NullStatsProvider.class, StatsProvider.class,
+                                        defaultLoader);
+    }
+
+    /**
+     * Set the stats provider used by bookie.
+     *
+     * @param providerClass
+     *          stats provider class
+     * @return server configuration
+     */
+    public ServerConfiguration setStatsProviderClass(Class<? extends StatsProvider> providerClass) {
+        setProperty(STATS_PROVIDER_CLASS, providerClass.getName());
+        return this;
+    }
+
+    /**
+     * Validate the configuration.
+     * @throws ConfigurationException
+     */
+    public void validate() throws ConfigurationException {
+        if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
+            throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
+        }
+        if (getJournalAlignmentSize() < 512 || getJournalAlignmentSize() % 512 != 0) {
+            throw new ConfigurationException("Invalid journal alignment size : " + getJournalAlignmentSize());
+        }
+        if (getJournalAlignmentSize() > getJournalPreAllocSizeMB() * 1024 * 1024) {
+            throw new ConfigurationException("Invalid preallocation size : " + getJournalPreAllocSizeMB() + " MB");
+        }
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Sat Aug  2 18:02:06 2014
@@ -26,12 +26,18 @@ import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+
 public class BookieRequestProcessor implements RequestProcessor {
 
     private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -39,12 +45,12 @@ public class BookieRequestProcessor impl
      * The server configuration. We use this for getting the number of add and read
      * worker threads.
      */
-    private ServerConfiguration serverCfg;
+    private final ServerConfiguration serverCfg;
 
     /**
      * This is the Bookie instance that is used to handle all read and write requests.
      */
-    private Bookie bookie;
+    final Bookie bookie;
 
     /**
      * The threadpool used to execute all read entry requests issued to this server.
@@ -56,10 +62,16 @@ public class BookieRequestProcessor impl
      */
     private final ExecutorService writeThreadPool;
 
+    // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
     private final boolean statsEnabled;
+    final OpStatsLogger addRequestStats;
+    final OpStatsLogger addEntryStats;
+    final OpStatsLogger readRequestStats;
+    final OpStatsLogger readEntryStats;
 
-    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie) {
+    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
+                                  StatsLogger statsLogger) {
         this.serverCfg = serverCfg;
         this.bookie = bookie;
         this.readThreadPool =
@@ -68,7 +80,12 @@ public class BookieRequestProcessor impl
         this.writeThreadPool =
             createExecutor(this.serverCfg.getNumAddWorkerThreads(),
                            "BookieWriteThread-" + serverCfg.getBookiePort() + "-%d");
+        // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
+        this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
+        this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
+        this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+        this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
     }
 
     @Override
@@ -138,7 +155,7 @@ public class BookieRequestProcessor impl
     }
 
     private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, bookie);
+        WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
         if (null == writeThreadPool) {
             write.run();
         } else {
@@ -147,7 +164,7 @@ public class BookieRequestProcessor impl
     }
 
     private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, bookie);
+        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this);
         if (null == readThreadPool) {
             read.run();
         } else {
@@ -156,7 +173,7 @@ public class BookieRequestProcessor impl
     }
 
     private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
-        WriteEntryProcessor write = new WriteEntryProcessor(r, c, bookie);
+        WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
         if (null == writeThreadPool) {
             write.run();
         } else {
@@ -165,7 +182,7 @@ public class BookieRequestProcessor impl
     }
 
     private void processReadRequest(final BookieProtocol.Request r, final Channel c) {
-        ReadEntryProcessor read = new ReadEntryProcessor(r, c, bookie);
+        ReadEntryProcessor read = new ReadEntryProcessor(r, c, this);
         if (null == readThreadPool) {
             read.run();
         } else {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Sat Aug  2 18:02:06 2014
@@ -36,6 +36,10 @@ import org.apache.bookkeeper.processor.R
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
+
 /**
  * Implements the server-side part of the BookKeeper protocol.
  *
@@ -70,12 +77,23 @@ public class BookieServer {
     // request processor
     private final RequestProcessor requestProcessor;
 
+    // Expose Stats
+    private final StatsLogger statsLogger;
+
     public BookieServer(ServerConfiguration conf) throws IOException,
             KeeperException, InterruptedException, BookieException,
             UnavailableException, CompatibilityException {
+        this(conf, NullStatsLogger.INSTANCE);
+    }
+
+    public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
+            throws IOException, KeeperException, InterruptedException,
+            BookieException, UnavailableException, CompatibilityException {
         this.conf = conf;
+        this.statsLogger = statsLogger;
         this.bookie = newBookie(conf);
-        this.requestProcessor = new BookieRequestProcessor(conf, bookie);
+        this.requestProcessor = new BookieRequestProcessor(conf, bookie,
+                statsLogger.scope(SERVER_SCOPE));
         this.nettyServer = new BookieNettyServer(this.conf, requestProcessor);
         isAutoRecoveryDaemonEnabled = conf.isAutoRecoveryDaemonEnabled();
         if (isAutoRecoveryDaemonEnabled) {
@@ -85,7 +103,7 @@ public class BookieServer {
 
     protected Bookie newBookie(ServerConfiguration conf)
         throws IOException, KeeperException, InterruptedException, BookieException {
-        return new Bookie(conf);
+        return new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
     }
 
     public void start() throws IOException, UnavailableException {
@@ -349,7 +367,13 @@ public class BookieServer {
                            conf.getJournalDirName(), sb);
         LOG.info(hello);
         try {
-            final BookieServer bs = new BookieServer(conf);
+            // Initialize Stats Provider
+            Class<? extends StatsProvider> statsProviderClass =
+                    conf.getStatsProviderClass();
+            final StatsProvider statsProvider = ReflectionUtils.newInstance(statsProviderClass);
+            statsProvider.start(conf);
+
+            final BookieServer bs = new BookieServer(conf, statsProvider.getStatsLogger(""));
             bs.start();
             Runtime.getRuntime().addShutdownHook(new Thread() {
                 @Override
@@ -361,6 +385,8 @@ public class BookieServer {
             LOG.info("Register shutdown hook successfully");
             bs.join();
 
+            statsProvider.stop();
+            LOG.info("Stop stats provider");
             System.exit(bs.getExitCode());
         } catch (Exception e) {
             LOG.error("Exception running bookie server : ", e);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java Sat Aug  2 18:02:06 2014
@@ -17,8 +17,10 @@
  */
 package org.apache.bookkeeper.proto;
 
-import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,12 +29,14 @@ abstract class PacketProcessorBase imple
     private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
     final Request request;
     final Channel channel;
-    final Bookie bookie;
+    final BookieRequestProcessor requestProcessor;
+    final long enqueueNanos;
 
-    PacketProcessorBase(Request request, Channel channel, Bookie bookie) {
+    PacketProcessorBase(Request request, Channel channel, BookieRequestProcessor requestProcessor) {
         this.request = request;
         this.channel = channel;
-        this.bookie = bookie;
+        this.requestProcessor = requestProcessor;
+        this.enqueueNanos = MathUtils.nowInNano();
     }
 
     protected boolean isVersionCompatible() {
@@ -48,10 +52,21 @@ abstract class PacketProcessorBase imple
         return true;
     }
 
+    protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
+        channel.write(response);
+        if (BookieProtocol.EOK == rc) {
+            statsLogger.registerSuccessfulEvent(MathUtils.elapsedMSec(enqueueNanos));
+        } else {
+            statsLogger.registerFailedEvent(MathUtils.elapsedMSec(enqueueNanos));
+        }
+    }
+
     @Override
     public void run() {
         if (!isVersionCompatible()) {
-            channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request));
+            sendResponse(BookieProtocol.EBADVERSION,
+                         ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),
+                         requestProcessor.readRequestStats);
             return;
         }
         processPacket();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java Sat Aug  2 18:02:06 2014
@@ -20,22 +20,36 @@
  */
 package org.apache.bookkeeper.proto;
 
-import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.channel.Channel;
 
 public abstract class PacketProcessorBaseV3 {
 
     final Request request;
     final Channel channel;
-    final Bookie  bookie;
+    final BookieRequestProcessor requestProcessor;
+    final long enqueueNanos;
 
-    public PacketProcessorBaseV3(Request request, Channel channel, Bookie bookie) {
+    public PacketProcessorBaseV3(Request request, Channel channel,
+                                 BookieRequestProcessor requestProcessor) {
         this.request = request;
         this.channel = channel;
-        this.bookie = bookie;
+        this.requestProcessor = requestProcessor;
+        this.enqueueNanos = MathUtils.nowInNano();
+    }
+
+    protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
+        channel.write(response);
+        if (StatusCode.EOK == code) {
+            statsLogger.registerSuccessfulEvent(MathUtils.elapsedMSec(enqueueNanos));
+        } else {
+            statsLogger.registerFailedEvent(MathUtils.elapsedMSec(enqueueNanos));
+        }
     }
 
     protected boolean isVersionCompatible() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java Sat Aug  2 18:02:06 2014
@@ -35,8 +35,9 @@ import org.slf4j.LoggerFactory;
 class ReadEntryProcessor extends PacketProcessorBase {
     private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);
 
-    public ReadEntryProcessor(Request request, Channel channel, Bookie bookie) {
-        super(request, channel, bookie);
+    public ReadEntryProcessor(Request request, Channel channel,
+                              BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
     }
 
     @Override
@@ -46,7 +47,7 @@ class ReadEntryProcessor extends PacketP
 
         LOG.debug("Received new read request: {}", request);
         int errorCode = BookieProtocol.EIO;
-        long startTime = MathUtils.now();
+        long startTimeNanos = MathUtils.nowInNano();
         ByteBuffer data = null;
         try {
             Future<Boolean> fenceResult = null;
@@ -54,14 +55,13 @@ class ReadEntryProcessor extends PacketP
                 LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + channel.getRemoteAddress());
 
                 if (read.hasMasterKey()) {
-                    fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+                    fenceResult = requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
                 } else {
                     LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
-                    BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
                     throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
-            data = bookie.readEntry(request.getLedgerId(), request.getEntryId());
+            data = requestProcessor.bookie.readEntry(request.getLedgerId(), request.getEntryId());
             LOG.debug("##### Read entry ##### {}", data.remaining());
             if (null != fenceResult) {
                 // TODO:
@@ -120,14 +120,14 @@ class ReadEntryProcessor extends PacketP
         LOG.trace("Read entry rc = {} for {}",
                 new Object[] { errorCode, read });
         if (errorCode == BookieProtocol.EOK) {
-            assert data != null;
+            requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+            sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, read),
+                         requestProcessor.readRequestStats);
 
-            channel.write(ResponseBuilder.buildReadResponse(data, read));
-            long elapsedTime = MathUtils.now() - startTime;
-            BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
         } else {
-            channel.write(ResponseBuilder.buildErrorResponse(errorCode, read));
-            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+            requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, read),
+                         requestProcessor.readRequestStats);
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java Sat Aug  2 18:02:06 2014
@@ -42,12 +42,13 @@ class ReadEntryProcessorV3 extends Packe
 
     private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
 
-    public ReadEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
-        super(request, channel, bookie);
+    public ReadEntryProcessorV3(Request request, Channel channel,
+                                BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
     }
 
     private ReadResponse getReadResponse() {
-        long startTime = MathUtils.now();
+        long startTimeNanos = MathUtils.nowInNano();
         ReadRequest readRequest = request.getReadRequest();
         long ledgerId = readRequest.getLedgerId();
         long entryId = readRequest.getEntryId();
@@ -58,7 +59,6 @@ class ReadEntryProcessorV3 extends Packe
 
         if (!isVersionCompatible()) {
             readResponse.setStatus(StatusCode.EBADVERSION);
-            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
             return readResponse.build();
         }
 
@@ -73,15 +73,14 @@ class ReadEntryProcessorV3 extends Packe
 
                 if (readRequest.hasMasterKey()) {
                     byte[] masterKey = readRequest.getMasterKey().toByteArray();
-                    fenceResult = bookie.fenceLedger(ledgerId, masterKey);
+                    fenceResult = requestProcessor.bookie.fenceLedger(ledgerId, masterKey);
                 } else {
                     LOG.error("Fence ledger request received without master key for ledger:{} from address: {}",
                               ledgerId, channel.getRemoteAddress());
-                    BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
                     throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
-            entryBody = bookie.readEntry(ledgerId, entryId);
+            entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
             if (null != fenceResult) {
                 // TODO:
                 // currently we don't have readCallback to run in separated read
@@ -135,10 +134,9 @@ class ReadEntryProcessorV3 extends Packe
         }
 
         if (status == StatusCode.EOK) {
-            long elapsedTime = MathUtils.now() - startTime;
-            BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+            requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
         } else {
-            BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+            requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
         }
 
         // Finally set status and return. The body would have been updated if
@@ -150,11 +148,17 @@ class ReadEntryProcessorV3 extends Packe
     @Override
     public void run() {
         ReadResponse readResponse = getReadResponse();
+        sendResponse(readResponse);
+    }
+
+    private void sendResponse(ReadResponse readResponse) {
         Response.Builder response = Response.newBuilder()
                 .setHeader(getHeader())
                 .setStatus(readResponse.getStatus())
                 .setReadResponse(readResponse);
-        channel.write(response.build());
+        sendResponse(response.getStatus(),
+                     response.build(),
+                     requestProcessor.readRequestStats);
     }
 }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java Sat Aug  2 18:02:06 2014
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.proto;
 
 import java.io.IOException;
 
-import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
@@ -36,10 +35,11 @@ class WriteEntryProcessor extends Packet
 
     private final static Logger LOG = LoggerFactory.getLogger(WriteEntryProcessor.class);
 
-    long startTime;
+    long startTimeNanos;
 
-    public WriteEntryProcessor(Request request, Channel channel, Bookie bookie) {
-        super(request, channel, bookie);
+    public WriteEntryProcessor(Request request, Channel channel,
+                               BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
     }
 
     @Override
@@ -47,23 +47,24 @@ class WriteEntryProcessor extends Packet
         assert (request instanceof BookieProtocol.AddRequest);
         BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
 
-        if (bookie.isReadOnly()) {
+        if (requestProcessor.bookie.isReadOnly()) {
             LOG.warn("BookieServer is running in readonly mode,"
                     + " so rejecting the request from the client!");
-            channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
-            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+            sendResponse(BookieProtocol.EREADONLY,
+                         ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
+                         requestProcessor.addRequestStats);
             return;
         }
 
-        startTime = MathUtils.now();
+        startTimeNanos = MathUtils.nowInNano();
         int rc = BookieProtocol.EOK;
         try {
             if (add.isRecoveryAdd()) {
-                bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
-                                        this, channel, add.getMasterKey());
+                requestProcessor.bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
+                                                         this, channel, add.getMasterKey());
             } else {
-                bookie.addEntry(add.getDataAsByteBuffer(),
-                                this, channel, add.getMasterKey());
+                requestProcessor.bookie.addEntry(add.getDataAsByteBuffer(),
+                                                 this, channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);
@@ -76,23 +77,23 @@ class WriteEntryProcessor extends Packet
             rc = BookieProtocol.EUA;
         }
         if (rc != BookieProtocol.EOK) {
-            channel.write(ResponseBuilder.buildErrorResponse(rc, add));
-            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+            requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+            sendResponse(rc,
+                         ResponseBuilder.buildErrorResponse(rc, add),
+                         requestProcessor.addRequestStats);
         }
     }
 
     @Override
     public void writeComplete(int rc, long ledgerId, long entryId,
                               BookieSocketAddress addr, Object ctx) {
-        channel.write(ResponseBuilder.buildAddResponse(request));
-
-        // compute the latency
-        if (0 == rc) {
-            // for add operations, we compute latency in writeComplete callbacks.
-            long elapsedTime = MathUtils.now() - startTime;
-            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
+        if (BookieProtocol.EOK == rc) {
+            requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
         } else {
-            BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+            requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
         }
+        sendResponse(rc,
+                     ResponseBuilder.buildAddResponse(request),
+                     requestProcessor.addRequestStats);
     }
 }