You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/08/02 20:02:07 UTC
svn commit: r1615338 [1/2] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache...
Author: fpj
Date: Sat Aug 2 18:02:06 2014
New Revision: 1615338
URL: http://svn.apache.org/r1615338
Log:
BOOKEEPER-697. stats collection on bookkeeper server (sijie via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Aug 2 18:02:06 2014
@@ -298,6 +298,8 @@ Trunk (unreleased changes)
BOOKKEEPER-739: Test timeouts mostly ignored (sijie via fpj)
+ BOOKKEEPER-697: stats collection on bookkeeper server (sijie via fpj)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Sat Aug 2 18:02:06 2014
@@ -272,3 +272,6 @@ zkTimeout=10000
# When false, bookie will use its ipaddress for the registration.
# Defaults to false.
#useHostNameAsBookieID=false
+
+# Stats Provider Class
+#statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsProvider
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat Aug 2 18:02:06 2014
@@ -50,6 +50,10 @@ import org.apache.bookkeeper.meta.Ledger
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
@@ -71,11 +75,16 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
+
/**
* Implements a bookie.
*
*/
-
public class Bookie extends BookieCriticalThread {
private final static Logger LOG = LoggerFactory.getLogger(Bookie.class);
@@ -121,6 +130,10 @@ public class Bookie extends BookieCritic
final private AtomicBoolean readOnly = new AtomicBoolean(false);
+ // Expose Stats
+ private final Counter writeBytes;
+ private final Counter readBytes;
+
public static class NoLedgerException extends IOException {
private static final long serialVersionUID = 1L;
private final long ledgerId;
@@ -290,7 +303,7 @@ public class Bookie extends BookieCritic
boolean newEnv = false;
List<File> missedCookieDirs = new ArrayList<File>();
Cookie journalCookie = null;
- // try to read cookie from journal directory.
+ // try to read cookie from journal directory.
try {
journalCookie = Cookie.readFromDirectory(journalDirectory);
if (journalCookie.isBookieHostCreatedFromIp()) {
@@ -312,7 +325,7 @@ public class Bookie extends BookieCritic
masterCookie.verify(zkCookie);
} catch (KeeperException.NoNodeException nne) {
// can occur in cases:
- // 1) new environment or
+ // 1) new environment or
// 2) done only metadata format and started bookie server.
}
checkDirectoryStructure(journalDirectory);
@@ -424,19 +437,25 @@ public class Bookie extends BookieCritic
return currentDirs;
}
-
public Bookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
+ this(conf, NullStatsLogger.INSTANCE);
+ }
+
+ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
+ throws IOException, KeeperException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
- this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+ this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ statsLogger.scope(LD_LEDGER_SCOPE));
File[] idxDirs = conf.getIndexDirs();
if (null == idxDirs) {
this.indexDirsManager = this.ledgerDirsManager;
} else {
- this.indexDirsManager = new LedgerDirsManager(conf, idxDirs);
+ this.indexDirsManager = new LedgerDirsManager(conf, idxDirs,
+ statsLogger.scope(LD_INDEX_SCOPE));
}
// instantiate zookeeper client to initialize ledger manager
@@ -456,11 +475,11 @@ public class Bookie extends BookieCritic
if (conf.getSortedLedgerStorageEnabled()) {
ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
ledgerDirsManager, indexDirsManager,
- journal);
+ journal, statsLogger);
} else {
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
ledgerDirsManager, indexDirsManager,
- journal);
+ journal, statsLogger);
}
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, journal);
@@ -471,6 +490,22 @@ public class Bookie extends BookieCritic
String myID = getMyId();
zkBookieRegPath = this.bookieRegistrationPath + myID;
zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
+
+ // Expose Stats
+ writeBytes = statsLogger.getCounter(WRITE_BYTES);
+ readBytes = statsLogger.getCounter(READ_BYTES);
+ // 1 : up, 0 : readonly
+ statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return readOnly.get() ? 0 : 1;
+ }
+ });
}
private String getMyId() throws UnknownHostException {
@@ -1029,13 +1064,6 @@ public class Bookie extends BookieCritic
return l;
}
- protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
- throws IOException, BookieException {
- byte[] key = ledgerStorage.readMasterKey(ledgerId);
- LedgerDescriptor handle = handles.getHandle(ledgerId, key);
- handle.addEntry(entry);
- }
-
/**
* Add an entry to a ledger as specified by handle.
*/
@@ -1046,6 +1074,8 @@ public class Bookie extends BookieCritic
long entryId = handle.addEntry(entry);
entry.rewind();
+ writeBytes.add(entry.remaining());
+
LOG.trace("Adding {}@{}", entryId, ledgerId);
journal.logAddEntry(entry, cb, ctx);
}
@@ -1124,7 +1154,9 @@ public class Bookie extends BookieCritic
throws IOException, NoLedgerException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
LOG.trace("Reading {}@{}", entryId, ledgerId);
- return handle.readEntry(entryId);
+ ByteBuffer entry = handle.readEntry(entryId);
+ readBytes.add(entry.remaining());
+ return entry;
}
// The rest of the code is test stuff
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java Sat Aug 2 18:02:06 2014
@@ -27,12 +27,22 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_FLUSH_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_GET_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
+
/**
* The EntryMemTable holds in-memory representation to the entries not-yet flushed.
* When asked to flush, current EntrySkipList is moved to snapshot and is cleared.
@@ -101,11 +111,19 @@ public class EntryMemTable {
return new EntrySkipList(checkpointSource.newCheckpoint());
}
+ // Stats
+ private final OpStatsLogger snapshotStats;
+ private final OpStatsLogger putEntryStats;
+ private final OpStatsLogger getEntryStats;
+ private final Counter flushBytesCounter;
+ private final Counter throttlingCounter;
+
/**
* Constructor.
* @param conf Server configuration
*/
- public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source) {
+ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source,
+ final StatsLogger statsLogger) {
this.checkpointSource = source;
this.kvmap = newSkipList();
this.snapshot = EntrySkipList.EMPTY_VALUE;
@@ -114,6 +132,13 @@ public class EntryMemTable {
this.allocator = new SkipListArena(conf);
// skip list size limit
this.skipListSizeLimit = conf.getSkipListSizeLimit();
+
+ // Stats
+ this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
+ this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
+ this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
+ this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
+ this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
}
void dump() {
@@ -143,6 +168,7 @@ public class EntryMemTable {
// No-op if snapshot currently has entries
if (this.snapshot.isEmpty() &&
this.kvmap.compareTo(oldCp) < 0) {
+ final long startTimeNanos = MathUtils.nowInNano();
this.lock.writeLock().lock();
try {
if (this.snapshot.isEmpty() && !this.kvmap.isEmpty()
@@ -159,6 +185,12 @@ public class EntryMemTable {
} finally {
this.lock.writeLock().unlock();
}
+
+ if (null != cp) {
+ snapshotStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ snapshotStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
}
return cp;
}
@@ -207,6 +239,7 @@ public class EntryMemTable {
}
}
}
+ flushBytesCounter.add(size);
clearSnapshot(keyValues);
}
}
@@ -242,6 +275,7 @@ public class EntryMemTable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+ throttlingCounter.inc();
}
/**
@@ -252,24 +286,34 @@ public class EntryMemTable {
public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback cb)
throws IOException {
long size = 0;
- if (isSizeLimitReached()) {
- Checkpoint cp = snapshot();
- if (null != cp) {
- cb.onSizeLimitReached();
- } else {
- throttleWriters();
+ long startTimeNanos = MathUtils.nowInNano();
+ boolean success = false;
+ try {
+ if (isSizeLimitReached()) {
+ Checkpoint cp = snapshot();
+ if (null != cp) {
+ cb.onSizeLimitReached();
+ } else {
+ throttleWriters();
+ }
}
- }
- this.lock.readLock().lock();
- try {
- EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
- size = internalAdd(toAdd);
+ this.lock.readLock().lock();
+ try {
+ EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
+ size = internalAdd(toAdd);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ success = true;
+ return size;
} finally {
- this.lock.readLock().unlock();
+ if (success) {
+ putEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ putEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
}
-
- return size;
}
/**
@@ -326,14 +370,22 @@ public class EntryMemTable {
public EntryKeyValue getEntry(long ledgerId, long entryId) throws IOException {
EntryKey key = new EntryKey(ledgerId, entryId);
EntryKeyValue value = null;
+ long startTimeNanos = MathUtils.nowInNano();
+ boolean success = false;
this.lock.readLock().lock();
try {
value = this.kvmap.get(key);
if (value == null) {
value = this.snapshot.get(key);
}
+ success = true;
} finally {
this.lock.readLock().unlock();
+ if (success) {
+ getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
}
return value;
@@ -347,14 +399,22 @@ public class EntryMemTable {
public EntryKeyValue getLastEntry(long ledgerId) throws IOException {
EntryKey result = null;
EntryKey key = new EntryKey(ledgerId, Long.MAX_VALUE);
+ long startTimeNanos = MathUtils.nowInNano();
+ boolean success = false;
this.lock.readLock().lock();
try {
result = this.kvmap.floorKey(key);
if (result == null || result.getLedgerId() != ledgerId) {
result = this.snapshot.floorKey(key);
}
+ success = true;
} finally {
this.lock.readLock().unlock();
+ if (success) {
+ getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
}
if (result == null || result.getLedgerId() != ledgerId) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java Sat Aug 2 18:02:06 2014
@@ -21,6 +21,8 @@
package org.apache.bookkeeper.bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,8 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_INDEX_PAGES;
+
class IndexInMemPageMgr {
private final static Logger LOG = LoggerFactory.getLogger(IndexInMemPageMgr.class);
private final static ConcurrentHashMap<Long, LedgerEntryPage> EMPTY_PAGE_MAP
@@ -321,7 +325,8 @@ class IndexInMemPageMgr {
public IndexInMemPageMgr(int pageSize,
int entriesPerPage,
ServerConfiguration conf,
- IndexPersistenceMgr indexPersistenceManager) {
+ IndexPersistenceMgr indexPersistenceManager,
+ StatsLogger statsLogger) {
this.pageSize = pageSize;
this.entriesPerPage = entriesPerPage;
this.indexPersistenceManager = indexPersistenceManager;
@@ -335,6 +340,18 @@ class IndexInMemPageMgr {
}
LOG.info("maxMemory = {}, pageSize = {}, pageLimit = {}", new Object[] { Runtime.getRuntime().maxMemory(),
pageSize, pageLimit });
+ // Expose Stats
+ statsLogger.registerGauge(NUM_INDEX_PAGES, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getNumUsedPages();
+ }
+ });
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Sat Aug 2 18:02:06 2014
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
@@ -35,12 +34,18 @@ import java.util.concurrent.ConcurrentMa
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
+
public class IndexPersistenceMgr {
private final static Logger LOG = LoggerFactory.getLogger(IndexPersistenceMgr.class);
@@ -72,11 +77,15 @@ public class IndexPersistenceMgr {
private LedgerDirsManager ledgerDirsManager;
final LinkedList<Long> openLedgers = new LinkedList<Long>();
+ // Stats
+ private final Counter evictedLedgersCounter;
+
public IndexPersistenceMgr(int pageSize,
int entriesPerPage,
ServerConfiguration conf,
SnapshotMap<Long, Boolean> activeLedgers,
- LedgerDirsManager ledgerDirsManager) throws IOException {
+ LedgerDirsManager ledgerDirsManager,
+ StatsLogger statsLogger) throws IOException {
this.openFileLimit = conf.getOpenFileLimit();
this.activeLedgers = activeLedgers;
this.ledgerDirsManager = ledgerDirsManager;
@@ -86,6 +95,20 @@ public class IndexPersistenceMgr {
// Retrieve all of the active ledgers.
getActiveLedgers();
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+
+ // Expose Stats
+ evictedLedgersCounter = statsLogger.getCounter(LEDGER_CACHE_NUM_EVICTED_LEDGERS);
+ statsLogger.registerGauge(NUM_OPEN_LEDGERS, new Gauge<Integer>() {
+ @Override
+ public Integer getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Integer getSample() {
+ return getNumOpenLedgers();
+ }
+ });
}
FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
@@ -270,8 +293,7 @@ public class IndexPersistenceMgr {
// was executing.
return;
}
- // TODO Add a statistic here, we don't care really which
- // ledger is evicted, but the rate at which they get evicted
+ evictedLedgersCounter.inc();
FileInfo fi = fileInfoCache.remove(ledgerToRemove);
if (null == fi) {
// Seems like someone else already closed the file.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Sat Aug 2 18:02:06 2014
@@ -27,16 +27,22 @@ import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
-import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
+
/**
* Interleave ledger storage
* This ledger storage implementation stores all entries in a single
@@ -82,24 +88,31 @@ class InterleavedLedgerStorage implement
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
+ // Expose Stats
+ private final OpStatsLogger getOffsetStats;
+ private final OpStatsLogger getEntryStats;
+
InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource)
throws IOException {
- this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource);
+ this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource, NullStatsLogger.INSTANCE);
}
InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
- CheckpointSource checkpointSource)
+ CheckpointSource checkpointSource, StatsLogger statsLogger)
throws IOException {
activeLedgers = new SnapshotMap<Long, Boolean>();
this.checkpointSource = checkpointSource;
entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
- null == indexDirsManager ? ledgerDirsManager : indexDirsManager);
+ null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
activeLedgers, ledgerManager);
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+ // Expose Stats
+ getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
+ getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
}
private LedgerDirsListener getLedgerDirsListener() {
@@ -208,11 +221,36 @@ class InterleavedLedgerStorage implement
entryId = ledgerCache.getLastEntry(ledgerId);
}
- offset = ledgerCache.getEntryOffset(ledgerId, entryId);
- if (offset == 0) {
- throw new Bookie.NoEntryException(ledgerId, entryId);
+ // Get Offset
+ long startTimeNanos = MathUtils.nowInNano();
+ boolean success = false;
+ try {
+ offset = ledgerCache.getEntryOffset(ledgerId, entryId);
+ if (offset == 0) {
+ throw new Bookie.NoEntryException(ledgerId, entryId);
+ }
+ success = true;
+ } finally {
+ if (success) {
+ getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ getOffsetStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
+ }
+ // Get Entry
+ startTimeNanos = MathUtils.nowInNano();
+ success = false;
+ try {
+ byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
+ success = true;
+ return ByteBuffer.wrap(retBytes);
+ } finally {
+ if (success) {
+ getEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ } else {
+ getEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ }
}
- return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
}
private void flushOrCheckpoint(boolean isCheckpointFlush)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Aug 2 18:02:06 2014
@@ -35,9 +35,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
@@ -45,6 +50,8 @@ import org.apache.bookkeeper.util.ZeroBu
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
+
/**
* Provide journal related management.
*/
@@ -287,7 +294,7 @@ class Journal extends BookieCriticalThre
}
}
- private class ForceWriteRequest extends BookieCriticalThread {
+ private class ForceWriteRequest implements Runnable {
private final JournalChannel logFile;
private final LinkedList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
@@ -301,16 +308,17 @@ class Journal extends BookieCriticalThre
LinkedList<QueueEntry> forceWriteWaiters,
boolean shouldClose,
boolean isMarker) {
- super("ForceWriteRequestThread");
this.forceWriteWaiters = forceWriteWaiters;
this.logFile = logFile;
this.logId = logId;
this.lastFlushedPosition = lastFlushedPosition;
this.shouldClose = shouldClose;
this.isMarker = isMarker;
+ forceWriteQueueSize.inc();
}
public int process(boolean shouldForceWrite) throws IOException {
+ forceWriteQueueSize.dec();
if (isMarker) {
return 0;
}
@@ -334,6 +342,7 @@ class Journal extends BookieCriticalThre
@Override
public void run() {
for (QueueEntry e : this.forceWriteWaiters) {
+ journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(e.enqueueTime));
e.callback(); // Process cbs inline
}
}
@@ -397,6 +406,7 @@ class Journal extends BookieCriticalThre
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
+ forceWriteGroupingCountStats.registerSuccessfulEvent(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
@@ -506,7 +516,25 @@ class Journal extends BookieCriticalThre
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
+ // Expose Stats
+ private final OpStatsLogger journalAddEntryStats;
+ private final OpStatsLogger journalCreationStats;
+ private final OpStatsLogger journalFlushStats;
+ private final OpStatsLogger forceWriteGroupingCountStats;
+ private final OpStatsLogger forceWriteBatchEntriesStats;
+ private final OpStatsLogger forceWriteBatchBytesStats;
+ private final Counter journalQueueSize;
+ private final Counter forceWriteQueueSize;
+ private final Counter flushMaxWaitCounter;
+ private final Counter flushMaxOutstandingBytesCounter;
+ private final Counter flushEmptyQueueCounter;
+ private final Counter journalWriteBytes;
+
public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
+ this(conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+ }
+
+ public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
super("BookieJournal-" + conf.getBookiePort());
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
@@ -530,6 +558,20 @@ class Journal extends BookieCriticalThre
// read last log mark
lastLogMark.readLog();
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
+
+ // Expose Stats
+ journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
+ journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
+ journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+ forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
+ forceWriteBatchEntriesStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
+ forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_BATCH_BYTES);
+ journalQueueSize = statsLogger.getCounter(JOURNAL_QUEUE_SIZE);
+ forceWriteQueueSize = statsLogger.getCounter(JOURNAL_FORCE_WRITE_QUEUE_SIZE);
+ flushMaxWaitCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_MAX_WAIT);
+ flushMaxOutstandingBytesCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
+ flushEmptyQueueCounter = statsLogger.getCounter(JOURNAL_NUM_FLUSH_EMPTY_QUEUE);
+ journalWriteBytes = statsLogger.getCounter(JOURNAL_WRITE_BYTES);
}
LastLogMark getLastLogMark() {
@@ -703,6 +745,7 @@ class Journal extends BookieCriticalThre
long ledgerId = entry.getLong();
long entryId = entry.getLong();
entry.rewind();
+ journalQueueSize.inc();
queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
}
@@ -738,6 +781,9 @@ class Journal extends BookieCriticalThre
ZeroBuffer.put(paddingBuff);
JournalChannel logFile = null;
forceWriteThread.start();
+ Stopwatch journalCreationWatcher = new Stopwatch();
+ Stopwatch journalFlushWatcher = new Stopwatch();
+ long batchSize = 0;
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use MathUtils.now(), which use System.nanoTime() and
@@ -753,6 +799,8 @@ class Journal extends BookieCriticalThre
// new journal file to write
if (null == logFile) {
logId = logId + 1;
+
+ journalCreationWatcher.reset().start();
logFile = new JournalChannel(journalDirectory,
logId,
journalPreAllocSize,
@@ -760,6 +808,9 @@ class Journal extends BookieCriticalThre
conf.getJournalAlignmentSize(),
removePagesFromCache,
conf.getJournalFormatVersionToWrite());
+ journalCreationStats.registerSuccessfulEvent(
+ journalCreationWatcher.stop().elapsedTime(TimeUnit.MILLISECONDS));
+
bc = logFile.getBufferedChannel();
lastFlushPosition = bc.position();
@@ -787,17 +838,20 @@ class Journal extends BookieCriticalThre
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
+ flushMaxWaitCounter.inc();
} else if (qe != null &&
((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) ||
(bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
shouldFlush = true;
+ flushMaxOutstandingBytesCounter.inc();
} else if (qe == null) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
shouldFlush = true;
+ flushEmptyQueueCounter.inc();
}
// toFlush is non null and not empty so should be safe to access getFirst
@@ -805,8 +859,11 @@ class Journal extends BookieCriticalThre
if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize());
}
+ journalFlushWatcher.reset().start();
bc.flush(false);
lastFlushPosition = bc.position();
+ journalFlushStats.registerSuccessfulEvent(
+ journalFlushWatcher.stop().elapsedTime(TimeUnit.MILLISECONDS));
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
@@ -815,8 +872,12 @@ class Journal extends BookieCriticalThre
}
}
+ forceWriteBatchEntriesStats.registerSuccessfulEvent(toFlush.size());
+ forceWriteBatchBytesStats.registerSuccessfulEvent(batchSize);
+
forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
toFlush = new LinkedList<QueueEntry>();
+ batchSize = 0L;
// check whether journal file is over file limit
if (bc.position() > maxJournalSize) {
logFile = null;
@@ -835,6 +896,11 @@ class Journal extends BookieCriticalThre
continue;
}
+ journalWriteBytes.add(qe.entry.remaining());
+ journalQueueSize.dec();
+
+ batchSize += (4 + qe.entry.remaining());
+
lenBuff.clear();
lenBuff.putInt(qe.entry.remaining());
lenBuff.flip();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Sat Aug 2 18:02:06 2014
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +43,18 @@ public class LedgerCacheImpl implements
private final int entriesPerPage;
public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
- LedgerDirsManager ledgerDirsManager) throws IOException {
+ LedgerDirsManager ledgerDirsManager) throws IOException {
+ this(conf, activeLedgers, ledgerDirsManager, NullStatsLogger.INSTANCE);
+ }
+
+ public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
+ LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) throws IOException {
this.pageSize = conf.getPageSize();
this.entriesPerPage = pageSize / 8;
this.indexPersistenceManager = new IndexPersistenceMgr(pageSize, entriesPerPage, conf, activeLedgers,
- ledgerDirsManager);
- this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf, indexPersistenceManager);
+ ledgerDirsManager, statsLogger);
+ this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf,
+ indexPersistenceManager, statsLogger);
}
IndexPersistenceMgr getIndexPersistenceManager() {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Sat Aug 2 18:02:06 2014
@@ -26,8 +26,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
@@ -37,6 +42,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_WRITABLE_DIRS;
+
/**
* This class manages ledger directories used by the bookie.
*/
@@ -51,15 +58,48 @@ public class LedgerDirsManager {
private final List<LedgerDirsListener> listeners;
private final LedgerDirsMonitor monitor;
private final Random rand = new Random();
+ private final ConcurrentMap<File, Float> diskUsages =
+ new ConcurrentHashMap<File, Float>();
public LedgerDirsManager(ServerConfiguration conf, File[] dirs) {
+ this(conf, dirs, NullStatsLogger.INSTANCE);
+ }
+
+ LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) {
this.ledgerDirectories = Arrays.asList(Bookie
.getCurrentDirectories(dirs));
this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
this.filledDirs = new ArrayList<File>();
- listeners = new ArrayList<LedgerDirsManager.LedgerDirsListener>();
+ listeners = new ArrayList<LedgerDirsListener>();
diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval());
+ for (File dir : dirs) {
+ diskUsages.put(dir, 0f);
+ String statName = "dir_" + dir.getPath().replace('/', '_') + "_usage";
+ final File targetDir = dir;
+ statsLogger.registerGauge(statName, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return diskUsages.get(targetDir) * 100;
+ }
+ });
+ }
+ statsLogger.registerGauge(LD_WRITABLE_DIRS, new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return writableLedgerDirectories.size();
+ }
+ });
}
/**
@@ -192,7 +232,7 @@ public class LedgerDirsManager {
/**
* Sweep through all the directories to check disk errors or disk full.
- *
+ *
* @throws DiskErrorException
* If disk having errors
* @throws NoWritableLedgerDirException
@@ -246,18 +286,22 @@ public class LedgerDirsManager {
// Check all writable dirs disk space usage.
for (File dir : writableDirs) {
try {
- diskChecker.checkDir(dir);
+ diskUsages.put(dir, diskChecker.checkDir(dir));
} catch (DiskErrorException e) {
+ LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
// Notify disk failure to all listeners
for (LedgerDirsListener listener : listeners) {
- LOG.warn("{} has errors.", dir, e);
listener.diskFailed(dir);
}
} catch (DiskWarnThresholdException e) {
+ LOG.warn("Ledger directory {} is almost full.", dir);
+ diskUsages.put(dir, e.getUsage());
for (LedgerDirsListener listener : listeners) {
listener.diskAlmostFull(dir);
}
} catch (DiskOutOfSpaceException e) {
+ LOG.error("Ledger directory {} is out-of-space.", dir);
+ diskUsages.put(dir, e.getUsage());
// Notify disk full to all listeners
addToFilledDirs(dir);
}
@@ -266,7 +310,7 @@ public class LedgerDirsManager {
// Check all full-filled disk space usage
for (File dir : fullfilledDirs) {
try {
- diskChecker.checkDir(dir);
+ diskUsages.put(dir, diskChecker.checkDir(dir));
addToWritableDirs(dir, true);
} catch (DiskErrorException e) {
//Notify disk failure to all the listeners
@@ -274,10 +318,12 @@ public class LedgerDirsManager {
listener.diskFailed(dir);
}
} catch (DiskWarnThresholdException e) {
+ diskUsages.put(dir, e.getUsage());
// the full-filled dir become writable but still above warn threshold
addToWritableDirs(dir, false);
} catch (DiskOutOfSpaceException e) {
// the full-filled dir is still full-filled
+ diskUsages.put(dir, e.getUsage());
}
}
try {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java Sat Aug 2 18:02:06 2014
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.bookie.Chec
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +44,10 @@ public class SortedLedgerStorage extends
public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
- final CheckpointSource checkpointSource)
+ final CheckpointSource checkpointSource, StatsLogger statsLogger)
throws IOException {
- super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null);
- this.memTable = new EntryMemTable(conf, checkpointSource);
+ super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null, statsLogger);
+ this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("SortedLedgerStorage-%d")
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Aug 2 18:02:06 2014
@@ -21,6 +21,9 @@ import java.io.File;
import java.util.List;
import com.google.common.annotations.Beta;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
@@ -76,8 +79,6 @@ public class ServerConfiguration extends
// Zookeeper Parameters
protected final static String ZK_TIMEOUT = "zkTimeout";
protected final static String ZK_SERVERS = "zkServers";
- // Statistics Parameters
- protected final static String ENABLE_STATISTICS = "enableStatistics";
protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
//ReadOnly mode support on all disk full
protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
@@ -104,6 +105,10 @@ public class ServerConfiguration extends
protected final static String SKIP_LIST_CHUNK_SIZE_ENTRY = "skipListArenaChunkSize";
protected final static String SKIP_LIST_MAX_ALLOC_ENTRY = "skipListArenaMaxAllocSize";
+ // Statistics Parameters
+ protected final static String ENABLE_STATISTICS = "enableStatistics";
+ protected final static String STATS_PROVIDER_CLASS = "statsProviderClass";
+
/**
* Construct a default configuration object
*/
@@ -1287,25 +1292,9 @@ public class ServerConfiguration extends
}
/**
- * Validate the configuration.
- * @throws ConfigurationException
- */
- public void validate() throws ConfigurationException {
- if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
- throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
- }
- if (getJournalAlignmentSize() < 512 || getJournalAlignmentSize() % 512 != 0) {
- throw new ConfigurationException("Invalid journal alignment size : " + getJournalAlignmentSize());
- }
- if (getJournalAlignmentSize() > getJournalPreAllocSizeMB() * 1024 * 1024) {
- throw new ConfigurationException("Invalid preallocation size : " + getJournalPreAllocSizeMB() + " MB");
- }
- }
-
- /**
* Get whether bookie is using hostname for registration and in ledger
* metadata. Defaults to false.
- *
+ *
* @return true, then bookie will be registered with its hostname and
* hostname will be used in ledger metadata. Otherwise bookie will
* use its ipaddress
@@ -1317,12 +1306,56 @@ public class ServerConfiguration extends
/**
* Configure the bookie to use its hostname to register with the
* co-ordination service(eg: zookeeper) and in ledger metadata
- *
+ *
* @see #getUseHostNameAsBookieID
* @param useHostName
* whether to use hostname for registration and in ledgermetadata
+ * @return server configuration
*/
- public void setUseHostNameAsBookieID(boolean useHostName) {
+ public ServerConfiguration setUseHostNameAsBookieID(boolean useHostName) {
setProperty(USE_HOST_NAME_AS_BOOKIE_ID, useHostName);
+ return this;
}
+
+ /**
+ * Get the stats provider used by bookie.
+ *
+ * @return stats provider class
+ * @throws ConfigurationException
+ */
+ public Class<? extends StatsProvider> getStatsProviderClass()
+ throws ConfigurationException {
+ return ReflectionUtils.getClass(this, STATS_PROVIDER_CLASS,
+ NullStatsProvider.class, StatsProvider.class,
+ defaultLoader);
+ }
+
+ /**
+ * Set the stats provider used by bookie.
+ *
+ * @param providerClass
+ * stats provider class
+ * @return server configuration
+ */
+ public ServerConfiguration setStatsProviderClass(Class<? extends StatsProvider> providerClass) {
+ setProperty(STATS_PROVIDER_CLASS, providerClass.getName());
+ return this;
+ }
+
+ /**
+ * Validate the configuration.
+ * @throws ConfigurationException
+ */
+ public void validate() throws ConfigurationException {
+ if (getSkipListArenaChunkSize() < getSkipListArenaMaxAllocSize()) {
+ throw new ConfigurationException("Arena max allocation size should be smaller than the chunk size.");
+ }
+ if (getJournalAlignmentSize() < 512 || getJournalAlignmentSize() % 512 != 0) {
+ throw new ConfigurationException("Invalid journal alignment size : " + getJournalAlignmentSize());
+ }
+ if (getJournalAlignmentSize() > getJournalPreAllocSizeMB() * 1024 * 1024) {
+ throw new ConfigurationException("Invalid preallocation size : " + getJournalPreAllocSizeMB() + " MB");
+ }
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Sat Aug 2 18:02:06 2014
@@ -26,12 +26,18 @@ import java.util.concurrent.Executors;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+
public class BookieRequestProcessor implements RequestProcessor {
private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -39,12 +45,12 @@ public class BookieRequestProcessor impl
* The server configuration. We use this for getting the number of add and read
* worker threads.
*/
- private ServerConfiguration serverCfg;
+ private final ServerConfiguration serverCfg;
/**
* This is the Bookie instance that is used to handle all read and write requests.
*/
- private Bookie bookie;
+ final Bookie bookie;
/**
* The threadpool used to execute all read entry requests issued to this server.
@@ -56,10 +62,16 @@ public class BookieRequestProcessor impl
*/
private final ExecutorService writeThreadPool;
+ // Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
+ final OpStatsLogger addRequestStats;
+ final OpStatsLogger addEntryStats;
+ final OpStatsLogger readRequestStats;
+ final OpStatsLogger readEntryStats;
- public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie) {
+ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
+ StatsLogger statsLogger) {
this.serverCfg = serverCfg;
this.bookie = bookie;
this.readThreadPool =
@@ -68,7 +80,12 @@ public class BookieRequestProcessor impl
this.writeThreadPool =
createExecutor(this.serverCfg.getNumAddWorkerThreads(),
"BookieWriteThread-" + serverCfg.getBookiePort() + "-%d");
+ // Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
+ this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
+ this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
+ this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+ this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
}
@Override
@@ -138,7 +155,7 @@ public class BookieRequestProcessor impl
}
private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, bookie);
+ WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
if (null == writeThreadPool) {
write.run();
} else {
@@ -147,7 +164,7 @@ public class BookieRequestProcessor impl
}
private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
- ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, bookie);
+ ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this);
if (null == readThreadPool) {
read.run();
} else {
@@ -156,7 +173,7 @@ public class BookieRequestProcessor impl
}
private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
- WriteEntryProcessor write = new WriteEntryProcessor(r, c, bookie);
+ WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
if (null == writeThreadPool) {
write.run();
} else {
@@ -165,7 +182,7 @@ public class BookieRequestProcessor impl
}
private void processReadRequest(final BookieProtocol.Request r, final Channel c) {
- ReadEntryProcessor read = new ReadEntryProcessor(r, c, bookie);
+ ReadEntryProcessor read = new ReadEntryProcessor(r, c, this);
if (null == readThreadPool) {
read.run();
} else {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Sat Aug 2 18:02:06 2014
@@ -36,6 +36,10 @@ import org.apache.bookkeeper.processor.R
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
+
/**
* Implements the server-side part of the BookKeeper protocol.
*
@@ -70,12 +77,23 @@ public class BookieServer {
// request processor
private final RequestProcessor requestProcessor;
+ // Expose Stats
+ private final StatsLogger statsLogger;
+
public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException {
+ this(conf, NullStatsLogger.INSTANCE);
+ }
+
+ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
+ throws IOException, KeeperException, InterruptedException,
+ BookieException, UnavailableException, CompatibilityException {
this.conf = conf;
+ this.statsLogger = statsLogger;
this.bookie = newBookie(conf);
- this.requestProcessor = new BookieRequestProcessor(conf, bookie);
+ this.requestProcessor = new BookieRequestProcessor(conf, bookie,
+ statsLogger.scope(SERVER_SCOPE));
this.nettyServer = new BookieNettyServer(this.conf, requestProcessor);
isAutoRecoveryDaemonEnabled = conf.isAutoRecoveryDaemonEnabled();
if (isAutoRecoveryDaemonEnabled) {
@@ -85,7 +103,7 @@ public class BookieServer {
protected Bookie newBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
- return new Bookie(conf);
+ return new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
}
public void start() throws IOException, UnavailableException {
@@ -349,7 +367,13 @@ public class BookieServer {
conf.getJournalDirName(), sb);
LOG.info(hello);
try {
- final BookieServer bs = new BookieServer(conf);
+ // Initialize Stats Provider
+ Class<? extends StatsProvider> statsProviderClass =
+ conf.getStatsProviderClass();
+ final StatsProvider statsProvider = ReflectionUtils.newInstance(statsProviderClass);
+ statsProvider.start(conf);
+
+ final BookieServer bs = new BookieServer(conf, statsProvider.getStatsLogger(""));
bs.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
@@ -361,6 +385,8 @@ public class BookieServer {
LOG.info("Register shutdown hook successfully");
bs.join();
+ statsProvider.stop();
+ LOG.info("Stop stats provider");
System.exit(bs.getExitCode());
} catch (Exception e) {
LOG.error("Exception running bookie server : ", e);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java Sat Aug 2 18:02:06 2014
@@ -17,8 +17,10 @@
*/
package org.apache.bookkeeper.proto;
-import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,12 +29,14 @@ abstract class PacketProcessorBase imple
private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
final Request request;
final Channel channel;
- final Bookie bookie;
+ final BookieRequestProcessor requestProcessor;
+ final long enqueueNanos;
- PacketProcessorBase(Request request, Channel channel, Bookie bookie) {
+ PacketProcessorBase(Request request, Channel channel, BookieRequestProcessor requestProcessor) {
this.request = request;
this.channel = channel;
- this.bookie = bookie;
+ this.requestProcessor = requestProcessor;
+ this.enqueueNanos = MathUtils.nowInNano();
}
protected boolean isVersionCompatible() {
@@ -48,10 +52,21 @@ abstract class PacketProcessorBase imple
return true;
}
+ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
+ channel.write(response);
+ if (BookieProtocol.EOK == rc) {
+ statsLogger.registerSuccessfulEvent(MathUtils.elapsedMSec(enqueueNanos));
+ } else {
+ statsLogger.registerFailedEvent(MathUtils.elapsedMSec(enqueueNanos));
+ }
+ }
+
@Override
public void run() {
if (!isVersionCompatible()) {
- channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request));
+ sendResponse(BookieProtocol.EBADVERSION,
+ ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),
+ requestProcessor.readRequestStats);
return;
}
processPacket();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java Sat Aug 2 18:02:06 2014
@@ -20,22 +20,36 @@
*/
package org.apache.bookkeeper.proto;
-import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.jboss.netty.channel.Channel;
public abstract class PacketProcessorBaseV3 {
final Request request;
final Channel channel;
- final Bookie bookie;
+ final BookieRequestProcessor requestProcessor;
+ final long enqueueNanos;
- public PacketProcessorBaseV3(Request request, Channel channel, Bookie bookie) {
+ public PacketProcessorBaseV3(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
this.request = request;
this.channel = channel;
- this.bookie = bookie;
+ this.requestProcessor = requestProcessor;
+ this.enqueueNanos = MathUtils.nowInNano();
+ }
+
+ protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
+ channel.write(response);
+ if (StatusCode.EOK == code) {
+ statsLogger.registerSuccessfulEvent(MathUtils.elapsedMSec(enqueueNanos));
+ } else {
+ statsLogger.registerFailedEvent(MathUtils.elapsedMSec(enqueueNanos));
+ }
}
protected boolean isVersionCompatible() {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java Sat Aug 2 18:02:06 2014
@@ -35,8 +35,9 @@ import org.slf4j.LoggerFactory;
class ReadEntryProcessor extends PacketProcessorBase {
private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);
- public ReadEntryProcessor(Request request, Channel channel, Bookie bookie) {
- super(request, channel, bookie);
+ public ReadEntryProcessor(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
+ super(request, channel, requestProcessor);
}
@Override
@@ -46,7 +47,7 @@ class ReadEntryProcessor extends PacketP
LOG.debug("Received new read request: {}", request);
int errorCode = BookieProtocol.EIO;
- long startTime = MathUtils.now();
+ long startTimeNanos = MathUtils.nowInNano();
ByteBuffer data = null;
try {
Future<Boolean> fenceResult = null;
@@ -54,14 +55,13 @@ class ReadEntryProcessor extends PacketP
LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + channel.getRemoteAddress());
if (read.hasMasterKey()) {
- fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+ fenceResult = requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
} else {
LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
- data = bookie.readEntry(request.getLedgerId(), request.getEntryId());
+ data = requestProcessor.bookie.readEntry(request.getLedgerId(), request.getEntryId());
LOG.debug("##### Read entry ##### {}", data.remaining());
if (null != fenceResult) {
// TODO:
@@ -120,14 +120,14 @@ class ReadEntryProcessor extends PacketP
LOG.trace("Read entry rc = {} for {}",
new Object[] { errorCode, read });
if (errorCode == BookieProtocol.EOK) {
- assert data != null;
+ requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
+ sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, read),
+ requestProcessor.readRequestStats);
- channel.write(ResponseBuilder.buildReadResponse(data, read));
- long elapsedTime = MathUtils.now() - startTime;
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
} else {
- channel.write(ResponseBuilder.buildErrorResponse(errorCode, read));
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, read),
+ requestProcessor.readRequestStats);
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java Sat Aug 2 18:02:06 2014
@@ -42,12 +42,13 @@ class ReadEntryProcessorV3 extends Packe
private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
- public ReadEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
- super(request, channel, bookie);
+ public ReadEntryProcessorV3(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
+ super(request, channel, requestProcessor);
}
private ReadResponse getReadResponse() {
- long startTime = MathUtils.now();
+ long startTimeNanos = MathUtils.nowInNano();
ReadRequest readRequest = request.getReadRequest();
long ledgerId = readRequest.getLedgerId();
long entryId = readRequest.getEntryId();
@@ -58,7 +59,6 @@ class ReadEntryProcessorV3 extends Packe
if (!isVersionCompatible()) {
readResponse.setStatus(StatusCode.EBADVERSION);
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
return readResponse.build();
}
@@ -73,15 +73,14 @@ class ReadEntryProcessorV3 extends Packe
if (readRequest.hasMasterKey()) {
byte[] masterKey = readRequest.getMasterKey().toByteArray();
- fenceResult = bookie.fenceLedger(ledgerId, masterKey);
+ fenceResult = requestProcessor.bookie.fenceLedger(ledgerId, masterKey);
} else {
LOG.error("Fence ledger request received without master key for ledger:{} from address: {}",
ledgerId, channel.getRemoteAddress());
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
- entryBody = bookie.readEntry(ledgerId, entryId);
+ entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
if (null != fenceResult) {
// TODO:
// currently we don't have readCallback to run in separated read
@@ -135,10 +134,9 @@ class ReadEntryProcessorV3 extends Packe
}
if (status == StatusCode.EOK) {
- long elapsedTime = MathUtils.now() - startTime;
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+ requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
} else {
- BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
}
// Finally set status and return. The body would have been updated if
@@ -150,11 +148,17 @@ class ReadEntryProcessorV3 extends Packe
@Override
public void run() {
ReadResponse readResponse = getReadResponse();
+ sendResponse(readResponse);
+ }
+
+ private void sendResponse(ReadResponse readResponse) {
Response.Builder response = Response.newBuilder()
.setHeader(getHeader())
.setStatus(readResponse.getStatus())
.setReadResponse(readResponse);
- channel.write(response.build());
+ sendResponse(response.getStatus(),
+ response.build(),
+ requestProcessor.readRequestStats);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java?rev=1615338&r1=1615337&r2=1615338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java Sat Aug 2 18:02:06 2014
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.proto;
import java.io.IOException;
-import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
@@ -36,10 +35,11 @@ class WriteEntryProcessor extends Packet
private final static Logger LOG = LoggerFactory.getLogger(WriteEntryProcessor.class);
- long startTime;
+ long startTimeNanos;
- public WriteEntryProcessor(Request request, Channel channel, Bookie bookie) {
- super(request, channel, bookie);
+ public WriteEntryProcessor(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
+ super(request, channel, requestProcessor);
}
@Override
@@ -47,23 +47,24 @@ class WriteEntryProcessor extends Packet
assert (request instanceof BookieProtocol.AddRequest);
BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
- if (bookie.isReadOnly()) {
+ if (requestProcessor.bookie.isReadOnly()) {
LOG.warn("BookieServer is running in readonly mode,"
+ " so rejecting the request from the client!");
- channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
- BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ sendResponse(BookieProtocol.EREADONLY,
+ ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
+ requestProcessor.addRequestStats);
return;
}
- startTime = MathUtils.now();
+ startTimeNanos = MathUtils.nowInNano();
int rc = BookieProtocol.EOK;
try {
if (add.isRecoveryAdd()) {
- bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
- this, channel, add.getMasterKey());
+ requestProcessor.bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
+ this, channel, add.getMasterKey());
} else {
- bookie.addEntry(add.getDataAsByteBuffer(),
- this, channel, add.getMasterKey());
+ requestProcessor.bookie.addEntry(add.getDataAsByteBuffer(),
+ this, channel, add.getMasterKey());
}
} catch (IOException e) {
LOG.error("Error writing " + add, e);
@@ -76,23 +77,23 @@ class WriteEntryProcessor extends Packet
rc = BookieProtocol.EUA;
}
if (rc != BookieProtocol.EOK) {
- channel.write(ResponseBuilder.buildErrorResponse(rc, add));
- BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
+ sendResponse(rc,
+ ResponseBuilder.buildErrorResponse(rc, add),
+ requestProcessor.addRequestStats);
}
}
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
- channel.write(ResponseBuilder.buildAddResponse(request));
-
- // compute the latency
- if (0 == rc) {
- // for add operations, we compute latency in writeComplete callbacks.
- long elapsedTime = MathUtils.now() - startTime;
- BKStats.getInstance().getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
+ if (BookieProtocol.EOK == rc) {
+ requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedMSec(startTimeNanos));
} else {
- BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedMSec(startTimeNanos));
}
+ sendResponse(rc,
+ ResponseBuilder.buildAddResponse(request),
+ requestProcessor.addRequestStats);
}
}