You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/18 11:18:07 UTC
svn commit: r1203568 [1/2] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/ bookkeeper-server/bin/ bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client...
Author: ivank
Date: Fri Nov 18 10:18:05 2011
New Revision: 1203568
URL: http://svn.apache.org/viewvc?rev=1203568&view=rev
Log:
BOOKKEEPER-108: add configuration support for BK (Sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
- copied, changed from r1203380, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConfigurationTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bkenv.sh
zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieLayoutVersionTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConditionalSetTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Nov 18 10:18:05 2011
@@ -74,6 +74,8 @@ BUGFIXES:
BOOKKEEPER-91: Bookkeeper and hedwig clients should not use log4j directly (ivank via fpj)
+ BOOKKEEPER-108: add configuration support for BK (Sijie via ivank)
+
hedwig-server/
BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Fri Nov 18 10:18:05 2011
@@ -23,10 +23,7 @@
BINDIR=`dirname "$0"`
BK_HOME=`cd $BINDIR/..;pwd`
-DEFAULT_ZOOKEEPER=localhost:2181
-DEFAULT_PORT=3181
-DEFAULT_TXN_DIR=/tmp/bk-txn
-DEFAULT_DATA_DIR=/tmp/bk-data
+DEFAULT_CONF=$BK_HOME/conf/bk_server.conf
source $BK_HOME/conf/bkenv.sh
@@ -55,11 +52,8 @@ where command is one of:
or command is the full name of a class with a defined main() method.
Environment variables:
- BOOKIE_ZOOKEEPER Zookeeper ensemble (default: $DEFAULT_ZOOKEEPER)
- BOOKIE_PORT Port to listen on (default: $DEFAULT_PORT)
- BOOKIE_TXN_LOGDIR Directory for transaction logs (default: $DEFAULT_TXN_DIR)
- BOOKIE_DATA_DIR Directory for data (default: $DEFAULT_DATA_DIR)
BOOKIE_LOG_CONF Log4j configuration file
+ BOOKIE_CONF Configuration file (default: conf/bk_server.conf)
BOOKIE_EXTRA_OPTS Extra options to be passed to the jvm
These variable can also be set in conf/bkenv.sh
@@ -101,20 +95,8 @@ fi
COMMAND=$1
shift
-if [ "$BOOKIE_ZOOKEEPER" == "" ]; then
- BOOKIE_ZOOKEEPER=$DEFAULT_ZOOKEEPER
-fi
-
-if [ "$BOOKIE_PORT" == "" ]; then
- BOOKIE_PORT=$DEFAULT_PORT
-fi
-
-if [ "$BOOKIE_TXN_LOGDIR" == "" ]; then
- BOOKIE_TXN_LOGDIR=$DEFAULT_TXN_DIR
-fi
-
-if [ "$BOOKIE_DATA_DIR" == "" ]; then
- BOOKIE_DATA_DIR=$DEFAULT_DATA_DIR
+if [ "$BOOKIE_CONF" == "" ]; then
+ BOOKIE_CONF=$DEFAULT_CONF
fi
BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH"
@@ -127,11 +109,11 @@ OPTS="-cp $BOOKIE_CLASSPATH $OPTS $BOOKI
OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
if [ $COMMAND == "bookie" ]; then
- exec java $OPTS org.apache.bookkeeper.proto.BookieServer $BOOKIE_PORT $BOOKIE_ZOOKEEPER $BOOKIE_TXN_LOGDIR $BOOKIE_DATA_DIR $@
+ exec java $OPTS org.apache.bookkeeper.proto.BookieServer --conf $BOOKIE_CONF $@
elif [ $COMMAND == "localbookie" ]; then
NUMBER=$1
shift
- exec java $OPTS org.apache.bookkeeper.util.LocalBookKeeper $NUMBER $@
+ exec java $OPTS org.apache.bookkeeper.util.LocalBookKeeper $NUMBER $BOOKIE_CONF $@
elif [ $COMMAND == "help" ]; then
bookkeeper_help;
else
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1203568&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Fri Nov 18 10:18:05 2011
@@ -0,0 +1,124 @@
+#!/bin/sh
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+## Bookie settings
+
+# Port that bookie server listen on
+bookie_port=3181
+
+# Directory Bookkeeper outputs its write ahead log
+journal_dir=/tmp/bk-txn
+
+# Directory Bookkeeper outputs ledger snapshots
+# could define multi directories to store snapshots, separated by ','
+# For example:
+# ledger_dirs=/tmp/bk1-data,/tmp/bk2-data
+#
+# Ideally ledger dirs and journal dir are each in a differet device,
+# which reduce the contention between random i/o and sequential write.
+# It is possible to run with a single disk, but performance will be significantly lower.
+ledger_dirs=/tmp/bk-data
+
+# Max file size of entry logger, in bytes
+# A new entry log file will be created when the old one reaches the file size limitation
+# logSizeLimit=2147483648
+
+# Max file size of journal file, in mega bytes
+# A new journal file will be created when the old one reaches the file size limitation
+# journal_max_size_mb=2048
+
+# Max number of old journal file to kept
+# Keep a number of old journal files would help data recovery in specia case
+# journal_max_backups=5
+
+# How long the interval to trigger next garbage collection, in milliseconds
+# Since garbage collection is running in background, too frequent gc
+# will heart performance. It is better to give a higher number of gc
+# interval if there is enough disk capacity.
+# gcWaitTime=1000
+
+# How long the interval to flush ledger index pages to disk, in milliseconds
+# Flushing index files will introduce much random disk I/O.
+# If separating journal dir and ledger dirs each on different devices,
+# flushing would not affect performance. But if putting journal dir
+# and ledger dirs on same device, performance degrade significantly
+# on too frequent flushing. You can consider increment flush interval
+# to get better performance, but you need to pay more time on bookie
+# server restart after failure.
+# flush_interval=100
+
+# Interval to watch whether bookie is dead or not, in milliseconds
+# bookie_death_watch_interval=1000
+
+## zookeeper client settings
+
+# A list of one of more servers on which zookeeper is running.
+# The server list can be comma separated values, for example:
+# zkServers=zk1:2181,zk2:2181,zk3:2181
+zkServers=localhost:2181
+# ZooKeeper client session timeout in milliseconds
+# Bookie server will exit if it received SESSION_EXPIRED because it
+# was partitioned off from ZooKeeper for more than the session timeout
+# JVM garbage collection, disk I/O will cause SESSION_EXPIRED.
+# Increment this value could help avoiding this issue
+zkTimeout=10000
+
+## NIO Server settings
+
+# This settings is used to enabled/disabled Nagle's algorithm, which is a means of
+# improving the efficiency of TCP/IP networks by reducing the number of packets
+# that need to be sent over the network.
+# If you are sending many small messages, such that more than one can fit in
+# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm
+# can provide better performance.
+# Default value is true.
+# server.tcpnodelay=true
+
+## ledger cache settings
+
+# Max number of ledger index files could be opened in bookie server
+# If number of ledger index files reaches this limitation, bookie
+# server started to swap some ledgers from memory to disk.
+# Too frequent swap will affect performance. You can tune this number
+# to gain performance according your requirements.
+# openFileLimit=900
+
+# Size of a index page in ledger cache, in bytes
+# A larger index page can improve performance writing page to disk,
+# which is efficent when you have small number of ledgers and these
+# ledgers have similar number of entries.
+# If you have large number of ledgers and each ledger has fewer entries,
+# smaller index page would improve memory usage.
+# pageSize=8192
+
+# How many index pages provided in ledger cache
+# If number of index pages reaches this limitation, bookie server
+# starts to swap some ledgers from memory to disk. You can increment
+# this value when you found swap became more frequent. But make sure
+# pageLimit*pageSize should not more than JVM max memory limitation,
+# otherwise you would got OutOfMemoryException.
+# In general, incrementing pageLimit, using smaller index page would
+# gain bettern performance in lager number of ledgers with fewer entries case
+# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute
+# the limitation of number of index pages.
+# pageLimit=-1
+
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bkenv.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bkenv.sh?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bkenv.sh (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bkenv.sh Fri Nov 18 10:18:05 2011
@@ -22,14 +22,5 @@
# default settings for starting bookkeeper
-# Zookeeper ensemble for bookkeeper to use
-#BOOKIE_ZOOKEEPER=
-
-# Port for bookie to listen on
-#BOOKIE_PORT=
-
-# Directory Bookkeeper outputs its write ahead log
-#BOOKIE_TXN_LOGDIR=
-
-# Directory Bookkeeper outputs ledger snapshots
-#BOOKIE_DATA_DIR=
+# Configuration file of settings used in bookie server
+# BOOKIE_CONF=
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Fri Nov 18 10:18:05 2011
@@ -50,6 +50,16 @@
<version>3.2.4.Final</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
<!--
Annoying dependency we need to include because
zookeeper uses log4j and so we transatively do, but
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Fri Nov 18 10:18:05 2011
@@ -44,6 +44,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.slf4j.Logger;
@@ -66,14 +67,18 @@ public class Bookie extends Thread {
static Logger LOG = LoggerFactory.getLogger(Bookie.class);
final static long MB = 1024 * 1024L;
// max journal file size
- final static long MAX_JOURNAL_SIZE = Long.getLong("journal_max_size_mb", 2 * 1024) * MB;
+ final long maxJournalSize;
// number journal files kept before marked journal
- final static int MAX_BACKUP_JOURNALS = Integer.getInteger("journal_max_backups", 5);
+ final int maxBackupJournals;
final File journalDirectory;
final File ledgerDirectories[];
+ final ServerConfiguration conf;
+
+ final SyncThread syncThread;
+
/**
* Current directory layout version. Increment this
* when you make a change to the format of any of the files in
@@ -82,7 +87,6 @@ public class Bookie extends Thread {
static final int CURRENT_DIRECTORY_LAYOUT_VERSION = 1;
static final String VERSION_FILENAME = "VERSION";
-
// ZK registration path for this bookie
static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
@@ -149,9 +153,13 @@ public class Bookie extends Thread {
// flag to ensure sync thread will not be interrupted during flush
final AtomicBoolean flushing = new AtomicBoolean(false);
// make flush interval as a parameter
- final int flushInterval = Integer.getInteger("flush_interval", 100);
- public SyncThread() {
+ final int flushInterval;
+ public SyncThread(ServerConfiguration conf) {
super("SyncThread");
+ flushInterval = conf.getFlushInterval();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flush Interval : " + flushInterval);
+ }
}
@Override
public void run() {
@@ -204,8 +212,8 @@ public class Bookie extends Thread {
});
// keep MAX_BACKUP_JOURNALS journal files before marked journal
- if (logs.size() >= MAX_BACKUP_JOURNALS) {
- int maxIdx = logs.size() - MAX_BACKUP_JOURNALS;
+ if (logs.size() >= maxBackupJournals) {
+ int maxIdx = logs.size() - maxBackupJournals;
for (int i=0; i<maxIdx; i++) {
long id = logs.get(i);
// make sure the journal id is smaller than marked journal id
@@ -233,18 +241,24 @@ public class Bookie extends Thread {
this.join();
}
}
- SyncThread syncThread = new SyncThread();
- public Bookie(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
+ public Bookie(ServerConfiguration conf) throws IOException {
+ this.journalDirectory = conf.getJournalDir();
+ this.ledgerDirectories = conf.getLedgerDirs();
+ this.conf = conf;
+
checkDirectoryLayoutVersion(journalDirectory);
for (File dir : ledgerDirectories) {
checkDirectoryLayoutVersion(dir);
}
- this.journalDirectory = journalDirectory;
- this.ledgerDirectories = ledgerDirectories;
- entryLogger = new EntryLogger(ledgerDirectories, this);
- ledgerCache = new LedgerCache(ledgerDirectories);
+ this.maxJournalSize = conf.getMaxJournalSize() * MB;
+ this.maxBackupJournals = conf.getMaxBackupJournals();
+
+ syncThread = new SyncThread(conf);
+ entryLogger = new EntryLogger(conf, this);
+ ledgerCache = new LedgerCache(conf);
+
lastLogMark.readLog();
if (LOG.isDebugEnabled()) {
LOG.debug("Last Log Mark : " + lastLogMark);
@@ -318,7 +332,7 @@ public class Bookie extends Thread {
}
}
}
- instantiateZookeeperClient(port, zkServers);
+ instantiateZookeeperClient(conf.getBookiePort(), conf.getZkServers());
setDaemon(true);
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
start();
@@ -372,7 +386,7 @@ public class Bookie extends Thread {
isZkExpired = false;
return;
}
- int zkTimeout = Integer.getInteger("zkTimeout", 10000);
+ int zkTimeout = conf.getZkTimeout();
// Create the ZooKeeper client instance
zk = newZookeeper(zkServers, zkTimeout);
// Create the ZK ephemeral node for this Bookie.
@@ -771,7 +785,7 @@ public class Bookie extends Thread {
toFlush.clear();
// check whether journal file is over file limit
- if (bc.position() > MAX_JOURNAL_SIZE) {
+ if (bc.position() > maxJournalSize) {
logFile.close();
logFile = null;
continue;
@@ -971,7 +985,7 @@ public class Bookie extends Thread {
*/
public static void main(String[] args) throws IOException,
InterruptedException, BookieException {
- Bookie b = new Bookie(5000, null, new File("/tmp"), new File[] { new File("/tmp") });
+ Bookie b = new Bookie(new ServerConfiguration());
CounterCallback cb = new CounterCallback();
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Fri Nov 18 10:18:05 2011
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentMa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -64,7 +65,7 @@ public class EntryLogger {
/**
* The maximum size of a entry logger file.
*/
- final static long LOG_SIZE_LIMIT = Long.getLong("logSizeLimit", 2 * 1024 * 1024 * 1024L);
+ final long logSizeLimit;
private volatile BufferedChannel logChannel;
/**
* The 1K block at the head of the entry logger file
@@ -87,16 +88,18 @@ public class EntryLogger {
// contain any active ledgers in them.
GarbageCollectorThread gcThread = new GarbageCollectorThread();
// This is how often we want to run the Garbage Collector Thread (in milliseconds).
- // This should be passed as a System property. Default it to 1000 ms (1sec).
- final static int gcWaitTime = Integer.getInteger("gcWaitTime", 1000);
+ final long gcWaitTime;
/**
* Create an EntryLogger that stores it's log files in the given
* directories
*/
- public EntryLogger(File dirs[], Bookie bookie) throws IOException {
- this.dirs = dirs;
+ public EntryLogger(ServerConfiguration conf, Bookie bookie) throws IOException {
+ this.dirs = conf.getLedgerDirs();
this.bookie = bookie;
+ // log size limit
+ this.logSizeLimit = conf.getEntryLogSizeLimit();
+ this.gcWaitTime = conf.getGcWaitTime();
// Initialize the entry log header buffer. This cannot be a static object
// since in our unit tests, we run multiple Bookies and thus EntryLoggers
// within the same JVM. All of these Bookie instances access this header
@@ -321,7 +324,7 @@ public class EntryLogger {
}
}
synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
- if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
+ if (logChannel.position() + entry.remaining() + 4 > logSizeLimit) {
openNewChannel();
}
ByteBuffer buff = ByteBuffer.allocate(4);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Fri Nov 18 10:18:05 2011
@@ -36,6 +36,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +51,20 @@ public class LedgerCache {
final File ledgerDirectories[];
- public LedgerCache(File ledgerDirectories[]) {
- this.ledgerDirectories = ledgerDirectories;
+ public LedgerCache(ServerConfiguration conf) {
+ this.ledgerDirectories = conf.getLedgerDirs();
+ this.openFileLimit = conf.getOpenFileLimit();
+ this.pageSize = conf.getPageSize();
+ this.entriesPerPage = pageSize / 8;
+
+ if (conf.getPageLimit() <= 0) {
+ // allocate half of the memory to the page cache
+ this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
+ } else {
+ this.pageLimit = conf.getPageLimit();
+ }
+ LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
+ LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
// Retrieve all of the active ledgers.
getActiveLedgers();
}
@@ -71,23 +85,25 @@ public class LedgerCache {
// Stores the set of active (non-deleted) ledgers.
ConcurrentMap<Long, Boolean> activeLedgers = new ConcurrentHashMap<Long, Boolean>();
- static int OPEN_FILE_LIMIT = 900;
- static {
- if (System.getProperty("openFileLimit") != null) {
- OPEN_FILE_LIMIT = Integer.parseInt(System.getProperty("openFileLimit"));
- }
- LOG.info("openFileLimit is " + OPEN_FILE_LIMIT);
+ final int openFileLimit;
+ final int pageSize;
+ final int pageLimit;
+ final int entriesPerPage;
+
+ /**
+ * @return page size used in ledger cache
+ */
+ public int getPageSize() {
+ return pageSize;
}
- // allocate half of the memory to the page cache
- private static int pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / LedgerEntryPage.PAGE_SIZE);
- static {
- LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
- if (System.getProperty("pageLimit") != null) {
- pageLimit = Integer.parseInt(System.getProperty("pageLimit"));
- }
- LOG.info("pageLimit is " + pageLimit);
+ /**
+ * @return entries per page used in ledger cache
+ */
+ public int getEntriesPerPage() {
+ return entriesPerPage;
}
+
// The number of pages that have actually been used
private int pageCount = 0;
HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
@@ -124,7 +140,7 @@ public class LedgerCache {
}
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
- int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+ int offsetInPage = (int) (entry % entriesPerPage);
// find the id of the first entry of the page that has the entry
// we are looking for
long pageEntry = entry-offsetInPage;
@@ -145,7 +161,7 @@ public class LedgerCache {
}
public long getEntryOffset(long ledger, long entry) throws IOException {
- int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+ int offsetInPage = (int) (entry%entriesPerPage);
// find the id of the first entry of the page that has the entry
// we are looking for
long pageEntry = entry-offsetInPage;
@@ -223,7 +239,7 @@ public class LedgerCache {
}
activeLedgers.put(ledger, true);
}
- if (openLedgers.size() > OPEN_FILE_LIMIT) {
+ if (openLedgers.size() > openFileLimit) {
fileInfoCache.remove(openLedgers.removeFirst()).close();
}
fi = new FileInfo(lf);
@@ -313,7 +329,7 @@ public class LedgerCache {
long lastOffset = -1;
for(int i = 0; i < entries.size(); i++) {
versions.add(i, entries.get(i).getVersion());
- if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != LedgerEntryPage.ENTRIES_PER_PAGES) {
+ if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
// send up a sequential list
int count = i - start;
if (count == 0) {
@@ -383,18 +399,18 @@ public class LedgerCache {
//System.out.println("Wrote " + rc + " to " + ledger);
totalWritten += rc;
}
- if (totalWritten != count*LedgerEntryPage.PAGE_SIZE) {
- throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count*LedgerEntryPage.PAGE_SIZE);
+ if (totalWritten != count * pageSize) {
+ throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count * pageSize);
}
}
private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
- if (entry % LedgerEntryPage.ENTRIES_PER_PAGES != 0) {
- throw new IllegalArgumentException(entry + " is not a multiple of " + LedgerEntryPage.ENTRIES_PER_PAGES);
+ if (entry % entriesPerPage != 0) {
+ throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
}
synchronized(this) {
if (pageCount < pageLimit) {
// let's see if we can allocate something
- LedgerEntryPage lep = new LedgerEntryPage();
+ LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
lep.setLedger(ledger);
lep.setFirstEntry(entry);
// note, this will not block since it is a new page
@@ -451,7 +467,7 @@ public class LedgerCache {
Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
if (map != null) {
for(LedgerEntryPage lep: map.values()) {
- if (lep.getFirstEntry() + LedgerEntryPage.ENTRIES_PER_PAGES < lastEntry) {
+ if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
continue;
}
lep.usePage();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Fri Nov 18 10:18:05 2011
@@ -113,15 +113,15 @@ public class LedgerDescriptor {
long size = fi.size();
// we may not have the last entry in the cache
if (size > lastEntry*8) {
- ByteBuffer bb = ByteBuffer.allocate(LedgerEntryPage.PAGE_SIZE);
- long position = size-LedgerEntryPage.PAGE_SIZE;
+ ByteBuffer bb = ByteBuffer.allocate(ledgerCache.getPageSize());
+ long position = size - ledgerCache.getPageSize();
if (position < 0) {
position = 0;
}
fi.read(bb, position);
bb.flip();
long startingEntryId = position/8;
- for(int i = LedgerEntryPage.ENTRIES_PER_PAGES-1; i >= 0; i--) {
+ for(int i = ledgerCache.getEntriesPerPage()-1; i >= 0; i--) {
if (bb.getLong(i*8) != 0) {
if (lastEntry < startingEntryId+i) {
lastEntry = startingEntryId+i;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java Fri Nov 18 10:18:05 2011
@@ -29,16 +29,22 @@ import java.nio.ByteBuffer;
* (entrylogfile, offset) for entry ids.
*/
public class LedgerEntryPage {
- public static final int PAGE_SIZE = 8192;
- public static final int ENTRIES_PER_PAGES = PAGE_SIZE/8;
+ private final int pageSize;
+ private final int entriesPerPage;
private long ledger = -1;
private long firstEntry = -1;
- private ByteBuffer page = ByteBuffer.allocateDirect(PAGE_SIZE);
+ private final ByteBuffer page;
private boolean clean = true;
private boolean pinned = false;
private int useCount;
private int version;
+ public LedgerEntryPage(int pageSize, int entriesPerPage) {
+ this.pageSize = pageSize;
+ this.entriesPerPage = entriesPerPage;
+ page = ByteBuffer.allocateDirect(pageSize);
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -129,8 +135,8 @@ public class LedgerEntryPage {
return version;
}
void setFirstEntry(long firstEntry) {
- if (firstEntry % ENTRIES_PER_PAGES != 0) {
- throw new IllegalArgumentException(firstEntry + " is not a multiple of " + ENTRIES_PER_PAGES);
+ if (firstEntry % entriesPerPage != 0) {
+ throw new IllegalArgumentException(firstEntry + " is not a multiple of " + entriesPerPage);
}
this.firstEntry = firstEntry;
}
@@ -141,7 +147,7 @@ public class LedgerEntryPage {
return useCount > 0;
}
public long getLastEntry() {
- for(int i = ENTRIES_PER_PAGES - 1; i >= 0; i--) {
+ for(int i = entriesPerPage - 1; i >= 0; i--) {
if (getOffset(i*8) > 0) {
return i + firstEntry;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Fri Nov 18 10:18:05 2011
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
import java.util.EnumSet;
import java.util.Set;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -77,6 +78,9 @@ public class BookKeeper {
OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
.getRuntime().availableProcessors());
+ ClientConfiguration conf;
+
+
/**
* Create a bookkeeper client. A zookeeper client and a client socket factory
* will be instantiated as part of this constructor.
@@ -92,7 +96,23 @@ public class BookKeeper {
*/
public BookKeeper(String servers) throws IOException, InterruptedException,
KeeperException {
- this(new ZooKeeper(servers, 10000, new Watcher() {
+ this(new ClientConfiguration().setZkServers(servers));
+ }
+
+ /**
+ * Create a bookkeeper client using a configuration object.
+ * A zookeeper client and a client socket factory will be
+ * instantiated as part of this constructor.
+ *
+ * @param conf
+ * Client Configuration object
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(ClientConfiguration conf) throws IOException, InterruptedException,
+ KeeperException {
+ this(conf, new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO: handle session disconnects and expires
@@ -105,20 +125,23 @@ public class BookKeeper {
ownZKHandle = true;
ownChannelFactory = true;
- }
+ }
/**
* Create a bookkeeper client but use the passed in zookeeper client instead
* of instantiating one.
*
+ * @param conf
+ * Client Configuration object
+ * {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
* the bookies have registered
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
- this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ public BookKeeper(ClientConfiguration conf, ZooKeeper zk) throws InterruptedException, KeeperException {
+ this(conf, zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
ownChannelFactory = true;
}
@@ -127,6 +150,9 @@ public class BookKeeper {
* Create a bookkeeper client but use the passed in zookeeper client and
* client socket channel factory instead of instantiating those.
*
+ * @param conf
+ * Client Configuration Object
+ * {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
* the bookies have registered
@@ -135,16 +161,17 @@ public class BookKeeper {
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
+ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
throws InterruptedException, KeeperException {
if (zk == null || channelFactory == null) {
throw new NullPointerException();
}
+ this.conf = conf;
this.zk = zk;
this.channelFactory = channelFactory;
bookieWatcher = new BookieWatcher(this);
bookieWatcher.readBookiesBlocking();
- bookieClient = new BookieClient(channelFactory, mainWorkerPool);
+ bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
}
/**
@@ -162,6 +189,10 @@ public class BookKeeper {
return zk;
}
+ protected ClientConfiguration getConf() {
+ return conf;
+ }
+
/**
* Get the BookieClient, currently used for doing bookie recovery.
*
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Fri Nov 18 10:18:05 2011
@@ -36,6 +36,7 @@ import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
@@ -85,9 +86,10 @@ public class BookKeeperAdmin {
* can open it. These values will come from System properties, though hard
* coded defaults are defined here.
*/
- private DigestType DIGEST_TYPE = DigestType.valueOf(System.getProperty("digestType", DigestType.CRC32.toString()));
- private byte[] PASSWD = System.getProperty("passwd", "").getBytes();
+ private DigestType DIGEST_TYPE;
+ private byte[] PASSWD;
+
/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -108,8 +110,30 @@ public class BookKeeperAdmin {
* BookKeeper client.
*/
public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, KeeperException {
+ this(new ClientConfiguration().setZkServers(zkServers));
+ }
+
+ /**
+ * Constructor that takes in a configuration object so we know
+ * how to connect to ZooKeeper to retrieve information about the BookKeeper
+ * cluster. We need this before we can do any type of admin operations on
+ * the BookKeeper cluster.
+ *
+ * @param conf
+ * Client Configuration Object
+ * @throws IOException
+ * throws this exception if there is an error instantiating the
+ * ZooKeeper client.
+ * @throws InterruptedException
+ * Throws this exception if there is an error instantiating the
+ * BookKeeper client.
+ * @throws KeeperException
+ * Throws this exception if there is an error instantiating the
+ * BookKeeper client.
+ */
+ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
// Create the ZooKeeper client instance
- zk = new ZooKeeper(zkServers, 10000, new Watcher() {
+ zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
@Override
public void process(WatchedEvent event) {
if (LOG.isDebugEnabled()) {
@@ -117,8 +141,11 @@ public class BookKeeperAdmin {
}
}
});
+
// Create the BookKeeper client instance
- bkc = new BookKeeper(zk);
+ bkc = new BookKeeper(conf);
+ DIGEST_TYPE = conf.getBookieRecoveryDigestType();
+ PASSWD = conf.getBookieRecoveryPasswd();
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Fri Nov 18 10:18:05 2011
@@ -147,7 +147,7 @@ class LedgerCreateOp implements StringCa
cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
return;
} catch (NumberFormatException e) {
- LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e);
+ LOG.error("Incorrectly entered parameter throttle: " + bk.getConf().getThrottleValue(), e);
cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
return;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Nov 18 10:18:05 2011
@@ -29,6 +29,7 @@ import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -69,7 +70,7 @@ public class LedgerHandle {
final DistributionSchedule distributionSchedule;
final Semaphore opCounterSem;
- private Integer throttling = 5000;
+ private final Integer throttling;
final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
@@ -89,10 +90,7 @@ public class LedgerHandle {
this.ledgerId = ledgerId;
- String throttleValue = System.getProperty("throttle");
- if(throttleValue != null) {
- this.throttling = new Integer(throttleValue);
- }
+ this.throttling = bk.getConf().getThrottleValue();
this.opCounterSem = new Semaphore(throttling);
macManager = DigestManager.instantiate(ledgerId, password, digestType);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Fri Nov 18 10:18:05 2011
@@ -127,7 +127,7 @@ class LedgerOpenOp implements DataCallba
cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
return;
} catch (NumberFormatException e) {
- LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e);
+ LOG.error("Incorrectly entered parameter throttle: " + bk.getConf().getThrottleValue(), e);
cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
return;
}
Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (from r1203380, zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java&p1=zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java&r1=1203380&r2=1203568&rev=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Fri Nov 18 10:18:05 2011
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hedwig.conf;
+package org.apache.bookkeeper.conf;
import java.net.URL;
@@ -23,12 +23,17 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
-public abstract class AbstractConfiguration {
- protected CompositeConfiguration conf;
+/**
+ * Abstract configuration
+ */
+public abstract class AbstractConfiguration extends CompositeConfiguration {
protected AbstractConfiguration() {
- conf = new CompositeConfiguration();
+ super();
+ // add configuration for system properties
+ addConfiguration(new SystemConfiguration());
}
/**
@@ -36,10 +41,21 @@ public abstract class AbstractConfigurat
* precedence over any loaded later.
*
* @param confURL
+ * Configuration URL
*/
public void loadConf(URL confURL) throws ConfigurationException {
Configuration loadedConf = new PropertiesConfiguration(confURL);
- conf.addConfiguration(loadedConf);
+ addConfiguration(loadedConf);
+ }
+ /**
+ * You can load configuration from other configuration
+ *
+ * @param baseConf
+ * Other Configuration
+ */
+ public void loadConf(AbstractConfiguration baseConf) {
+ addConfiguration(baseConf);
}
+
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1203568&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Fri Nov 18 10:18:05 2011
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.conf;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+
+/**
+ * Configuration settings for client side
+ */
+public class ClientConfiguration extends AbstractConfiguration {
+
+ // Zookeeper Parameters
+ protected final static String ZK_TIMEOUT = "zkTimeout";
+ protected final static String ZK_SERVERS = "zkServers";
+
+ // Throttle value
+ protected final static String THROTTLE = "throttle";
+
+ // Digest Type
+ protected final static String DIGEST_TYPE = "digestType";
+ // Passwd
+ protected final static String PASSWD = "passwd";
+
+ // NIO Parameters
+ protected final static String CLIENT_TCP_NODELAY = "client.tcpnodelay";
+
+ /**
+ * Construct a default client-side configuration
+ */
+ public ClientConfiguration() {
+ super();
+ }
+
+ /**
+ * Construct a client-side configuration using a base configuration
+ *
+ * @param conf
+ * Base configuration
+ */
+ public ClientConfiguration(AbstractConfiguration conf) {
+ super();
+ loadConf(conf);
+ }
+
+ /**
+ * Get throttle value
+ *
+ * @return throttle value
+ * @see #setThrottleValue
+ */
+ public int getThrottleValue() {
+ return this.getInt(THROTTLE, 5000);
+ }
+
+ /**
+ * Set throttle value.
+ *
+ * Since BookKeeper process requests in asynchrous way, it will holds
+ * those pending request in queue. You may easily run it out of memory
+ * if producing too many requests than the capability of bookie servers can handle.
+ * To prevent that from happeding, you can set a throttle value here.
+ *
+ * @param throttle
+ * Throttle Value
+ * @return client configuration
+ */
+ public ClientConfiguration setThrottleValue(int throttle) {
+ this.addProperty(THROTTLE, Integer.toString(throttle));
+ return this;
+ }
+
+ /**
+ * Get digest type used in bookkeeper admin
+ *
+ * @return digest type
+ * @see #setBookieRecoveryDigestType
+ */
+ public DigestType getBookieRecoveryDigestType() {
+ return DigestType.valueOf(this.getString(DIGEST_TYPE, DigestType.CRC32.toString()));
+ }
+
+ /**
+ * Set digest type used in bookkeeper admin.
+ *
+ * Digest Type and Passwd used to open ledgers for admin tool
+ * For now, assume that all ledgers were created with the same DigestType
+ * and password. In the future, this admin tool will need to know for each
+ * ledger, what was the DigestType and password used to create it before it
+ * can open it. These values will come from System properties, though fixed
+ * defaults are defined here.
+ *
+ * @param digestType
+ * Digest Type
+ * @return client configuration
+ */
+ public ClientConfiguration setBookieRecoveryDigestType(DigestType digestType) {
+ this.addProperty(DIGEST_TYPE, digestType.toString());
+ return this;
+ }
+
+ /**
+ * Get passwd used in bookkeeper admin
+ *
+ * @return password
+ * @see #setBookieRecoveryPasswd
+ */
+ public byte[] getBookieRecoveryPasswd() {
+ return this.getString(PASSWD, "").getBytes();
+ }
+
+ /**
+ * Set passwd used in bookkeeper admin.
+ *
+ * Digest Type and Passwd used to open ledgers for admin tool
+ * For now, assume that all ledgers were created with the same DigestType
+ * and password. In the future, this admin tool will need to know for each
+ * ledger, what was the DigestType and password used to create it before it
+ * can open it. These values will come from System properties, though fixed
+ * defaults are defined here.
+ *
+ * @param passwd
+ * Password
+ * @return client configuration
+ */
+ public ClientConfiguration setBookieRecoveryPasswd(byte[] passwd) {
+ addProperty(PASSWD, new String(passwd));
+ return this;
+ }
+
+ /**
+ * Is tcp connection no delay.
+ *
+ * @return tcp socket nodelay setting
+ * @see #setClientTcpNoDelay
+ */
+ public boolean getClientTcpNoDelay() {
+ return getBoolean(CLIENT_TCP_NODELAY, true);
+ }
+
+ /**
+ * Set socket nodelay setting.
+ *
+ * This settings is used to enabled/disabled Nagle's algorithm, which is a means of
+ * improving the efficiency of TCP/IP networks by reducing the number of packets
+ * that need to be sent over the network. If you are sending many small messages,
+ * such that more than one can fit in a single IP packet, setting client.tcpnodelay
+ * to false to enable Nagle algorithm can provide better performance.
+ * <br>
+ * Default value is true.
+ *
+ * @param noDelay
+ * NoDelay setting
+ * @return client configuration
+ */
+ public ClientConfiguration setClientTcpNoDelay(boolean noDelay) {
+ addProperty(CLIENT_TCP_NODELAY, Boolean.toString(noDelay));
+ return this;
+ }
+
+ /**
+ * Get zookeeper servers to connect
+ *
+ * @return zookeeper servers
+ */
+ public String getZkServers() {
+ return getString(ZK_SERVERS, "localhost");
+ }
+
+ /**
+ * Set zookeeper servers to connect
+ *
+ * @param zkServers
+ * ZooKeeper servers to connect
+ */
+ public ClientConfiguration setZkServers(String zkServers) {
+ addProperty(ZK_SERVERS, zkServers);
+ return this;
+ }
+
+ /**
+ * Get zookeeper timeout
+ *
+ * @return zookeeper client timeout
+ */
+ public int getZkTimeout() {
+ return getInt(ZK_TIMEOUT, 10000);
+ }
+
+ /**
+ * Set zookeeper timeout
+ *
+ * @param zkTimeout
+ * ZooKeeper client timeout
+ * @return client configuration
+ */
+ public ClientConfiguration setZkTimeout(int zkTimeout) {
+ addProperty(ZK_SERVERS, Integer.toString(zkTimeout));
+ return this;
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1203568&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Fri Nov 18 10:18:05 2011
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.conf;
+
+import java.io.File;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration manages server-side settings
+ */
+public class ServerConfiguration extends AbstractConfiguration {
+ // Entry Log Parameters
+ protected final static String ENTRY_LOG_SIZE_LIMIT = "logSizeLimit";
+
+ // Gc Parameters
+ protected final static String GC_WAIT_TIME = "gcWaitTime";
+ // Sync Parameters
+ protected final static String FLUSH_INTERVAL = "flush_interval";
+ // Bookie death watch interval
+ protected final static String DEATH_WATCH_INTERVAL = "bookie_death_watch_interval";
+ // Ledger Cache Parameters
+ protected final static String OPEN_FILE_LIMIT = "openFileLimit";
+ protected final static String PAGE_LIMIT = "pageLimit";
+ protected final static String PAGE_SIZE = "pageSize";
+ // Journal Parameters
+ protected final static String MAX_JOURNAL_SIZE = "journal_max_size_mb";
+ protected final static String MAX_BACKUP_JOURNALS = "journal_max_backups";
+ // Bookie Parameters
+ protected final static String BOOKIE_PORT = "bookie_port";
+ protected final static String JOURNAL_DIR = "journal_dir";
+ protected final static String LEDGER_DIRS = "ledger_dirs";
+ // NIO Parameters
+ protected final static String SERVER_TCP_NODELAY = "server.tcpnodelay";
+ // Zookeeper Parameters
+ protected final static String ZK_TIMEOUT = "zkTimeout";
+ protected final static String ZK_SERVERS = "zkServers";
+
+ // separator for ledger dir
+ protected final static String SEP = ",";
+
+ /**
+ * Construct a default configuration object
+ */
+ public ServerConfiguration() {
+ super();
+ }
+
+ /**
+ * Construct a configuration based on other configuration
+ *
+ * @param conf
+ * Other configuration
+ */
+ public ServerConfiguration(AbstractConfiguration conf) {
+ super();
+ loadConf(conf);
+ }
+
+ /**
+ * Get entry logger size limitation
+ *
+ * @return entry logger size limitation
+ */
+ public long getEntryLogSizeLimit() {
+ return this.getLong(ENTRY_LOG_SIZE_LIMIT, 2 * 1024 * 1024 * 1024L);
+ }
+
+ /**
+ * Set entry logger size limitation
+ *
+ * @param logSizeLimit
+ * new log size limitation
+ */
+ public ServerConfiguration setEntryLogSizeLimit(long logSizeLimit) {
+ this.addProperty(ENTRY_LOG_SIZE_LIMIT, Long.toString(logSizeLimit));
+ return this;
+ }
+
+ /**
+ * Get Garbage collection wait time
+ *
+ * @return gc wait time
+ */
+ public long getGcWaitTime() {
+ return this.getLong(GC_WAIT_TIME, 1000);
+ }
+
+ /**
+ * Set garbage collection wait time
+ *
+ * @param gcWaitTime
+ * gc wait time
+ * @return server configuration
+ */
+ public ServerConfiguration setGcWaitTime(long gcWaitTime) {
+ this.addProperty(GC_WAIT_TIME, Long.toString(gcWaitTime));
+ return this;
+ }
+
+ /**
+ * Get flush interval
+ *
+ * @return flush interval
+ */
+ public int getFlushInterval() {
+ return this.getInt(FLUSH_INTERVAL, 100);
+ }
+
+ /**
+ * Set flush interval
+ *
+ * @param flushInterval
+ * Flush Interval
+ * @return server configuration
+ */
+ public ServerConfiguration setFlushInterval(int flushInterval) {
+ this.addProperty(FLUSH_INTERVAL, Integer.toString(flushInterval));
+ return this;
+ }
+
+ /**
+ * Get bookie death watch interval
+ *
+ * @return watch interval
+ */
+ public int getDeathWatchInterval() {
+ return this.getInt(DEATH_WATCH_INTERVAL, 1000);
+ }
+
+ /**
+ * Get open file limit
+ *
+ * @return max number of files to open
+ */
+ public int getOpenFileLimit() {
+ return this.getInt(OPEN_FILE_LIMIT, 900);
+ }
+
+ /**
+ * Get limitation number of index pages in ledger cache
+ *
+ * @return max number of index pages in ledger cache
+ */
+ public int getPageLimit() {
+ return this.getInt(PAGE_LIMIT, -1);
+ }
+
+ /**
+ * Get page size
+ *
+ * @return page size in ledger cache
+ */
+ public int getPageSize() {
+ return this.getInt(PAGE_SIZE, 8192);
+ }
+
+ /**
+ * Max journal file size
+ *
+ * @return max journal file size
+ */
+ public long getMaxJournalSize() {
+ return this.getLong(MAX_JOURNAL_SIZE, 2 * 1024);
+ }
+
+ /**
+ * Set new max journal file size
+ *
+ * @param maxJournalSize
+ * new max journal file size
+ * @return server configuration
+ */
+ public ServerConfiguration setMaxJournalSize(long maxJournalSize) {
+ this.addProperty(MAX_JOURNAL_SIZE, Long.toString(maxJournalSize));
+ return this;
+ }
+
+ /**
+ * Max number of older journal files kept
+ *
+ * @return max number of older journal files to kept
+ */
+ public int getMaxBackupJournals() {
+ return this.getInt(MAX_BACKUP_JOURNALS, 5);
+ }
+
+ /**
+ * Set max number of older journal files to kept
+ *
+ * @param maxBackupJournals
+ * Max number of older journal files
+ * @return server configuration
+ */
+ public ServerConfiguration setMaxBackupJournals(int maxBackupJournals) {
+ this.addProperty(MAX_BACKUP_JOURNALS, Integer.toString(maxBackupJournals));
+ return this;
+ }
+
+ /**
+ * Get bookie port that bookie server listen on
+ *
+ * @return bookie port
+ */
+ public int getBookiePort() {
+ return this.getInt(BOOKIE_PORT, 3181);
+ }
+
+ /**
+ * Set new bookie port that bookie server listen on
+ *
+ * @param port
+ * Port to listen on
+ * @return server configuration
+ */
+ public ServerConfiguration setBookiePort(int port) {
+ this.addProperty(BOOKIE_PORT, Integer.toString(port));
+ return this;
+ }
+
+ /**
+ * Get dir name to store journal files
+ *
+ * @return journal dir name
+ */
+ public String getJournalDirName() {
+ return this.getString(JOURNAL_DIR, "/tmp/bk-txn");
+ }
+
+ /**
+ * Set dir name to store journal files
+ *
+ * @param journalDir
+ * Dir to store journal files
+ * @return server configuration
+ */
+ public ServerConfiguration setJournalDirName(String journalDir) {
+ this.addProperty(JOURNAL_DIR, journalDir);
+ return this;
+ }
+
+ /**
+ * Get dir to store journal files
+ *
+ * @return journal dir, if no journal dir provided return null
+ */
+ public File getJournalDir() {
+ String journalDirName = getJournalDirName();
+ if (null == journalDirName) {
+ return null;
+ }
+ return new File(journalDirName);
+ }
+
+ /**
+ * Get dir names to store ledger data
+ *
+ * @return ledger dir names, if not provided return null
+ */
+ public String[] getLedgerDirNames() {
+ String ledgerDirs = this.getString(LEDGER_DIRS, "/tmp/bk-data");
+ if (null == ledgerDirs) {
+ return null;
+ }
+ return ledgerDirs.split(SEP);
+ }
+
+ /**
+ * Set dir names to store ledger data
+ *
+ * @param ledgerDirs
+ * Dir names to store ledger data
+ * @return server configuration
+ */
+ public ServerConfiguration setLedgerDirNames(String[] ledgerDirs) {
+ if (null == ledgerDirs) {
+ return this;
+ }
+ this.addProperty(LEDGER_DIRS, StringUtils.join(ledgerDirs, SEP));
+ return this;
+ }
+
+ /**
+ * Get dirs that stores ledger data
+ *
+ * @return ledger dirs
+ */
+ public File[] getLedgerDirs() {
+ String[] ledgerDirNames = getLedgerDirNames();
+ if (null == ledgerDirNames) {
+ return null;
+ }
+ File[] ledgerDirs = new File[ledgerDirNames.length];
+ for (int i = 0; i < ledgerDirNames.length; i++) {
+ ledgerDirs[i] = new File(ledgerDirNames[i]);
+ }
+ return ledgerDirs;
+ }
+
+ /**
+ * Is tcp connection no delay.
+ *
+ * @return tcp socket nodelay setting
+ */
+ public boolean getServerTcpNoDelay() {
+ return getBoolean(SERVER_TCP_NODELAY, true);
+ }
+
+ /**
+ * Set socket nodelay setting
+ *
+ * @param noDelay
+ * NoDelay setting
+ * @return server configuration
+ */
+ public ServerConfiguration setServerTcpNoDelay(boolean noDelay) {
+ addProperty(SERVER_TCP_NODELAY, Boolean.toString(noDelay));
+ return this;
+ }
+
+ /**
+ * Get zookeeper servers to connect
+ *
+ * @return zookeeper servers
+ */
+ public String getZkServers() {
+ return getString(ZK_SERVERS, null);
+ }
+
+ /**
+ * Set zookeeper servers to connect
+ *
+ * @param zkServers
+ * ZooKeeper servers to connect
+ */
+ public ServerConfiguration setZkServers(String zkServers) {
+ addProperty(ZK_SERVERS, zkServers);
+ return this;
+ }
+
+ /**
+ * Get zookeeper timeout
+ *
+ * @return zookeeper server timeout
+ */
+ public int getZkTimeout() {
+ return getInt(ZK_TIMEOUT, 10000);
+ }
+
+ /**
+ * Set zookeeper timeout
+ *
+ * @param zkTimeout
+ * ZooKeeper server timeout
+ * @return server configuration
+ */
+ public ServerConfiguration setZkTimeout(int zkTimeout) {
+ addProperty(ZK_SERVERS, Integer.toString(zkTimeout));
+ return this;
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Fri Nov 18 10:18:05 2011
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -52,7 +53,10 @@ public class BookieClient {
ClientSocketChannelFactory channelFactory;
ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels = new ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient>();
- public BookieClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+ private final ClientConfiguration conf;
+
+ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+ this.conf = conf;
this.channelFactory = channelFactory;
this.executor = executor;
}
@@ -162,7 +166,7 @@ public class BookieClient {
ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
.newCachedThreadPool());
OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
- BookieClient bc = new BookieClient(channelFactory, executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
for (int i = 0; i < 100000; i++) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Fri Nov 18 10:18:05 2011
@@ -25,13 +25,22 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
import static org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,28 +49,28 @@ import org.slf4j.LoggerFactory;
*
*/
public class BookieServer implements NIOServerFactory.PacketProcessor, BookkeeperInternalCallbacks.WriteCallback {
- int port;
+ final ServerConfiguration conf;
NIOServerFactory nioServerFactory;
private volatile boolean running = false;
Bookie bookie;
DeathWatcher deathWatcher;
static Logger LOG = LoggerFactory.getLogger(BookieServer.class);
- public BookieServer(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
- this.port = port;
- this.bookie = new Bookie(port, zkServers, journalDirectory, ledgerDirectories);
+ public BookieServer(ServerConfiguration conf) throws IOException {
+ this.conf = conf;
+ this.bookie = new Bookie(conf);
}
public void start() throws IOException {
- nioServerFactory = new NIOServerFactory(port, this);
+ nioServerFactory = new NIOServerFactory(conf, this);
running = true;
- deathWatcher = new DeathWatcher();
+ deathWatcher = new DeathWatcher(conf);
deathWatcher.start();
}
public InetSocketAddress getLocalAddress() {
try {
- return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+ return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), conf.getBookiePort());
} catch (UnknownHostException uhe) {
return nioServerFactory.getLocalAddress();
}
@@ -106,9 +115,15 @@ public class BookieServer implements NIO
* A thread to watch whether bookie & nioserver is still alive
*/
class DeathWatcher extends Thread {
+
+ final int watchInterval;
+
+ DeathWatcher(ServerConfiguration conf) {
+ watchInterval = conf.getDeathWatchInterval();
+ }
+
@Override
public void run() {
- int watchInterval = Integer.getInteger("bookie_death_watch_interval", 1000);
while(true) {
try {
Thread.sleep(watchInterval);
@@ -127,33 +142,108 @@ public class BookieServer implements NIO
}
}
+ static final Options bkOpts = new Options();
+ static {
+ bkOpts.addOption("c", "conf", true, "Configuration for Bookie Server");
+ bkOpts.addOption("h", "help", false, "Print help message");
+ }
+
+ /**
+ * Print usage
+ */
+ private static void printUsage() {
+ HelpFormatter hf = new HelpFormatter();
+ hf.printHelp("BookieServer [options]\n\tor\n"
+ + "BookieServer <bookie_port> <zk_servers> <journal_dir> <ledger_dir [ledger_dir]>", bkOpts);
+ }
+
+ private static void loadConfFile(ServerConfiguration conf, String confFile)
+ throws IllegalArgumentException {
+ try {
+ conf.loadConf(new File(confFile).toURI().toURL());
+ } catch (MalformedURLException e) {
+ LOG.error("Could not open configuration file: " + confFile, e);
+ throw new IllegalArgumentException();
+ } catch (ConfigurationException e) {
+ LOG.error("Malformed configuration file: " + confFile, e);
+ throw new IllegalArgumentException();
+ }
+ LOG.info("Using configuration file " + confFile);
+ }
+
+ private static ServerConfiguration parseArgs(String[] args)
+ throws IllegalArgumentException {
+ try {
+ BasicParser parser = new BasicParser();
+ CommandLine cmdLine = parser.parse(bkOpts, args);
+
+ if (cmdLine.hasOption('h')) {
+ throw new IllegalArgumentException();
+ }
+
+ ServerConfiguration conf = new ServerConfiguration();
+ String[] leftArgs = cmdLine.getArgs();
+
+ if (cmdLine.hasOption('c')) {
+ if (null != leftArgs && leftArgs.length > 0) {
+ throw new IllegalArgumentException();
+ }
+ String confFile = cmdLine.getOptionValue("c");
+ loadConfFile(conf, confFile);
+ return conf;
+ }
+
+ if (leftArgs.length < 4) {
+ throw new IllegalArgumentException();
+ }
+
+ // command line arguments overwrite settings in configuration file
+ conf.setBookiePort(Integer.parseInt(leftArgs[0]));
+ conf.setZkServers(leftArgs[1]);
+ conf.setJournalDirName(leftArgs[2]);
+ String[] ledgerDirNames = new String[leftArgs.length - 3];
+ System.arraycopy(leftArgs, 3, ledgerDirNames, 0, ledgerDirNames.length);
+ conf.setLedgerDirNames(ledgerDirNames);
+
+ return conf;
+ } catch (ParseException e) {
+ LOG.error("Error parsing command line arguments : ", e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
/**
* @param args
* @throws IOException
* @throws InterruptedException
*/
- public static void main(String[] args) throws IOException, InterruptedException {
- if (args.length < 4) {
- System.err.println("USAGE: BookieServer port zkServers journalDirectory ledgerDirectory [ledgerDirectory]*");
- return;
+ public static void main(String[] args) throws IOException, InterruptedException,
+ IllegalArgumentException {
+ ServerConfiguration conf = null;
+ try {
+ conf = parseArgs(args);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Error parsing command line arguments : ", iae);
+ System.err.println(iae.getMessage());
+ printUsage();
+ throw iae;
}
- int port = Integer.parseInt(args[0]);
- String zkServers = args[1];
- File journalDirectory = new File(args[2]);
- File ledgerDirectory[] = new File[args.length - 3];
+
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < ledgerDirectory.length; i++) {
- ledgerDirectory[i] = new File(args[i + 3]);
+ String[] ledgerDirNames = conf.getLedgerDirNames();
+ for (int i = 0; i < ledgerDirNames.length; i++) {
if (i != 0) {
sb.append(',');
}
- sb.append(ledgerDirectory[i]);
+ sb.append(ledgerDirNames[i]);
}
+
String hello = String.format(
"Hello, I'm your bookie, listening on port %1$s. ZKServers are on %2$s. Journals are in %3$s. Ledgers are stored in %4$s.",
- port, zkServers, journalDirectory, sb);
+ conf.getBookiePort(), conf.getZkServers(),
+ conf.getJournalDirName(), sb);
LOG.info(hello);
- BookieServer bs = new BookieServer(port, zkServers, journalDirectory, ledgerDirectory);
+ BookieServer bs = new BookieServer(conf);
bs.start();
bs.join();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java Fri Nov 18 10:18:05 2011
@@ -34,6 +34,8 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,12 +72,15 @@ public class NIOServerFactory extends Th
long minLatency = 99999999;
- public NIOServerFactory(int port, PacketProcessor processor) throws IOException {
+ ServerConfiguration conf;
+
+ public NIOServerFactory(ServerConfiguration conf, PacketProcessor processor) throws IOException {
super("NIOServerFactory");
setDaemon(true);
this.processor = processor;
+ this.conf = conf;
this.ss = ServerSocketChannel.open();
- ss.socket().bind(new InetSocketAddress(port));
+ ss.socket().bind(new InetSocketAddress(conf.getBookiePort()));
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
start();
@@ -352,11 +357,7 @@ public class NIOServerFactory extends Th
public Cnxn(SocketChannel sock, SelectionKey sk) throws IOException {
this.sock = sock;
this.sk = sk;
- if (System.getProperty("server.tcpnodelay", "true").equals("true")) {
- sock.socket().setTcpNoDelay(true);
- } else {
- sock.socket().setTcpNoDelay(false);
- }
+ sock.socket().setTcpNoDelay(conf.getServerTcpNoDelay());
sock.socket().setSoLinger(true, 2);
sk.interestOps(SelectionKey.OP_READ);
if (LOG.isTraceEnabled()) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Nov 18 10:18:05 2011
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -91,9 +92,16 @@ public class PerChannelBookieClient exte
};
private ConnectionState state;
-
+ private final ClientConfiguration conf;
+
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
+ this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding);
+ }
+
+ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+ InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
+ this.conf = conf;
this.addr = addr;
this.executor = executor;
this.totalBytesOutstanding = totalBytesOutstanding;
@@ -115,11 +123,7 @@ public class PerChannelBookieClient exte
// to the bookie.
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setPipelineFactory(this);
- if (System.getProperty("server.tcpnodelay", "true").equals("true")) {
- bootstrap.setOption("tcpNoDelay", true);
- } else {
- bootstrap.setOption("tcpNoDelay", false);
- }
+ bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay());
bootstrap.setOption("keepAlive", true);
ChannelFuture future = bootstrap.connect(addr);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=1203568&r1=1203567&r2=1203568&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java Fri Nov 18 10:18:05 2011
@@ -27,6 +27,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +72,7 @@ public class LocalBookKeeper {
//BookKeeper variables
File tmpDirs[];
BookieServer bs[];
+ ServerConfiguration bsConfs[];
Integer initialPort = 5000;
/**
@@ -120,20 +122,28 @@ public class LocalBookKeeper {
LOG.error("Exception while creating znodes", e);
}
}
- private void runBookies() throws IOException {
+ private void runBookies(ServerConfiguration baseConf) throws IOException {
LOG.info("Starting Bookie(s)");
// Create Bookie Servers (B1, B2, B3)
tmpDirs = new File[numberOfBookies];
bs = new BookieServer[numberOfBookies];
+ bsConfs = new ServerConfiguration[numberOfBookies];
for(int i = 0; i < numberOfBookies; i++) {
tmpDirs[i] = File.createTempFile("bookie" + Integer.toString(i), "test");
tmpDirs[i].delete();
tmpDirs[i].mkdir();
- bs[i] = new BookieServer(initialPort + i, InetAddress.getLocalHost().getHostAddress() + ":"
- + ZooKeeperDefaultPort, tmpDirs[i], new File[] {tmpDirs[i]});
+ bsConfs[i] = new ServerConfiguration(baseConf);
+ // override settings
+ bsConfs[i].setBookiePort(initialPort + i);
+ bsConfs[i].setZkServers(InetAddress.getLocalHost().getHostAddress() + ":"
+ + ZooKeeperDefaultPort);
+ bsConfs[i].setJournalDirName(tmpDirs[i].getPath());
+ bsConfs[i].setLedgerDirNames(new String[] { tmpDirs[i].getPath() });
+
+ bs[i] = new BookieServer(bsConfs[i]);
bs[i].start();
}
}
@@ -144,9 +154,22 @@ public class LocalBookKeeper {
System.exit(-1);
}
LocalBookKeeper lb = new LocalBookKeeper(Integer.parseInt(args[0]));
+
+ ServerConfiguration conf = new ServerConfiguration();
+ if (args.length >= 2) {
+ String confFile = args[1];
+ try {
+ conf.loadConf(new File(confFile).toURI().toURL());
+ LOG.info("Using configuration file " + confFile);
+ } catch (Exception e) {
+ // load conf failed
+ LOG.warn("Error loading configuration file " + confFile, e);
+ }
+ }
+
lb.runZookeeper(1000);
lb.initializeZookeper();
- lb.runBookies();
+ lb.runBookies(conf);
while (true) {
Thread.sleep(5000);
}