You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/06/13 16:57:55 UTC
svn commit: r1349896 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/
bookkeeper-server/bin/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/jav...
Author: ivank
Date: Wed Jun 13 14:57:54 2012
New Revision: 1349896
URL: http://svn.apache.org/viewvc?rev=1349896&view=rev
Log:
BOOKKEEPER-183: Provide tools to read/check data files in bookie server (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jun 13 14:57:54 2012
@@ -12,6 +12,10 @@ Trunk (unreleased changes)
BOOKKEEPER-289: mvn clean doesn't remove test output files (sijie via ivank)
+ bookkeeper-server:
+
+ BOOKKEEPER-183: Provide tools to read/check data files in bookie server (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-274: Hedwig cpp client library should not link to cppunit which is just used for test. (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Wed Jun 13 14:57:54 2012
@@ -78,6 +78,7 @@ where command is one of:
bookie Run a bookie server
localbookie <n> Run a test ensemble of <n> bookies locally
upgrade Upgrade bookie filesystem
+ shell Run admin shell to read/check bookie files
help This help message
or command is the full name of a class with a defined main() method.
@@ -87,6 +88,7 @@ Environment variables:
BOOKIE_CONF Configuration file (default: $DEFAULT_CONF)
BOOKIE_EXTRA_OPTS Extra options to be passed to the jvm
BOOKIE_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
+ ENTRY_FORMATTER_CLASS Entry formatter class to format entries.
These variable can also be set in conf/bkenv.sh
EOF
@@ -155,6 +157,9 @@ elif [ $COMMAND == "localbookie" ]; then
exec java $OPTS $JMX_ARGS org.apache.bookkeeper.util.LocalBookKeeper $NUMBER $BOOKIE_CONF $@
elif [ $COMMAND == "upgrade" ]; then
exec java $OPTS org.apache.bookkeeper.bookie.FileSystemUpgrade --conf $BOOKIE_CONF $@
+elif [ $COMMAND == "shell" ]; then
+ ENTRY_FORMATTER_ARG="-DentryFormatterClass=${ENTRY_FORMATTER_CLASS:-org.apache.bookkeeper.util.StringEntryFormatter}"
+ exec java $OPTS $ENTRY_FORMATTER_ARG org.apache.bookkeeper.bookie.BookieShell -conf $BOOKIE_CONF $@
elif [ $COMMAND == "help" ]; then
bookkeeper_help;
else
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Wed Jun 13 14:57:54 2012
@@ -32,6 +32,12 @@
</properties>
<dependencies>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.3.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.Tool;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bookie Shell to read/check bookie files.
+ */
+public class BookieShell implements Tool {
+
+ static final Logger LOG = LoggerFactory.getLogger(BookieShell.class);
+
+ static final String ENTRY_FORMATTER_CLASS = "entryFormatterClass";
+
+ static final String CMD_LEDGER = "ledger";
+ static final String CMD_READLOG = "readlog";
+ static final String CMD_READJOURNAL = "readjournal";
+ static final String CMD_LASTMARK = "lastmark";
+ static final String CMD_HELP = "help";
+
+ final ServerConfiguration bkConf = new ServerConfiguration();
+ File[] ledgerDirectories;
+ File journalDirectory;
+
+ EntryLogger entryLogger = null;
+ Journal journal = null;
+ EntryFormatter formatter;
+
+ int pageSize;
+ int entriesPerPage;
+
+ interface Command {
+ public int runCmd(String[] args) throws Exception;
+ public void printUsage();
+ }
+
+ abstract class MyCommand implements Command {
+ abstract Options getOptions();
+ abstract String getDescription();
+ abstract String getUsage();
+ abstract int runCmd(CommandLine cmdLine) throws Exception;
+
+ String cmdName;
+
+ MyCommand(String cmdName) {
+ this.cmdName = cmdName;
+ }
+
+ @Override
+ public int runCmd(String[] args) throws Exception {
+ try {
+ BasicParser parser = new BasicParser();
+ CommandLine cmdLine = parser.parse(getOptions(), args);
+ return runCmd(cmdLine);
+ } catch (ParseException e) {
+ LOG.error("Error parsing command line arguments : ", e);
+ printUsage();
+ return -1;
+ }
+ }
+
+ @Override
+ public void printUsage() {
+ HelpFormatter hf = new HelpFormatter();
+ System.err.println(cmdName + ": " + getDescription());
+ hf.printHelp(getUsage(), getOptions());
+ }
+ }
+
+ /**
+ * Ledger Command Handles ledger related operations
+ */
+ class LedgerCmd extends MyCommand {
+ Options lOpts = new Options();
+
+ LedgerCmd() {
+ super(CMD_LEDGER);
+ lOpts.addOption("m", "meta", false, "Print meta information");
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ String[] leftArgs = cmdLine.getArgs();
+ if (leftArgs.length <= 0) {
+ System.err.println("ERROR: missing ledger id");
+ printUsage();
+ return -1;
+ }
+
+ boolean printMeta = false;
+ if (cmdLine.hasOption("m")) {
+ printMeta = true;
+ }
+ long ledgerId;
+ try {
+ ledgerId = Long.parseLong(leftArgs[0]);
+ } catch (NumberFormatException nfe) {
+ System.err.println("ERROR: invalid ledger id " + leftArgs[0]);
+ printUsage();
+ return -1;
+ }
+ if (printMeta) {
+ // print meta
+ readLedgerMeta(ledgerId);
+ }
+ // dump ledger info
+ readLedgerIndexEntries(ledgerId);
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "Dump ledger index entries into readable format.";
+ }
+
+ @Override
+ String getUsage() {
+ return "ledger [-m] <ledger_id>";
+ }
+
+ @Override
+ Options getOptions() {
+ return lOpts;
+ }
+ }
+
+ /**
+ * Command to read entry log files.
+ */
+ class ReadLogCmd extends MyCommand {
+ Options rlOpts = new Options();
+
+ ReadLogCmd() {
+ super(CMD_READLOG);
+ rlOpts.addOption("m", "msg", false, "Print message body");
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ String[] leftArgs = cmdLine.getArgs();
+ if (leftArgs.length <= 0) {
+ System.err.println("ERROR: missing entry log id or entry log file name");
+ printUsage();
+ return -1;
+ }
+
+ boolean printMsg = false;
+ if (cmdLine.hasOption("m")) {
+ printMsg = true;
+ }
+ long logId;
+ try {
+ logId = Long.parseLong(leftArgs[0]);
+ } catch (NumberFormatException nfe) {
+ // not a entry log id
+ File f = new File(leftArgs[0]);
+ String name = f.getName();
+ if (!name.endsWith(".log")) {
+ // not a log file
+ System.err.println("ERROR: invalid entry log file name " + leftArgs[0]);
+ printUsage();
+ return -1;
+ }
+ String idString = name.split("\\.")[0];
+ logId = Long.parseLong(idString, 16);
+ }
+ // scan entry log
+ scanEntryLog(logId, printMsg);
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "Scan an entry file and format the entries into readable format.";
+ }
+
+ @Override
+ String getUsage() {
+ return "readlog [-m] <entry_log_id | entry_log_file_name>";
+ }
+
+ @Override
+ Options getOptions() {
+ return rlOpts;
+ }
+ }
+
+ /**
+ * Command to read journal files
+ */
+ class ReadJournalCmd extends MyCommand {
+ Options rjOpts = new Options();
+
+ ReadJournalCmd() {
+ super(CMD_READJOURNAL);
+ rjOpts.addOption("m", "msg", false, "Print message body");
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ String[] leftArgs = cmdLine.getArgs();
+ if (leftArgs.length <= 0) {
+ System.err.println("ERROR: missing journal id or journal file name");
+ printUsage();
+ return -1;
+ }
+
+ boolean printMsg = false;
+ if (cmdLine.hasOption("m")) {
+ printMsg = true;
+ }
+ long journalId;
+ try {
+ journalId = Long.parseLong(leftArgs[0]);
+ } catch (NumberFormatException nfe) {
+ // not a journal id
+ File f = new File(leftArgs[0]);
+ String name = f.getName();
+ if (!name.endsWith(".txn")) {
+ // not a journal file
+ System.err.println("ERROR: invalid journal file name " + leftArgs[0]);
+ printUsage();
+ return -1;
+ }
+ String idString = name.split("\\.")[0];
+ journalId = Long.parseLong(idString, 16);
+ }
+ // scan journal
+ scanJournal(journalId, printMsg);
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "Scan a journal file and format the entries into readable format.";
+ }
+
+ @Override
+ String getUsage() {
+ return "readjournal [-m] <journal_id | journal_file_name>";
+ }
+
+ @Override
+ Options getOptions() {
+ return rjOpts;
+ }
+ }
+
+ /**
+ * Command to print last log mark
+ */
+ class LastMarkCmd implements Command {
+ @Override
+ public int runCmd(String[] args) throws Exception {
+ printLastLogMark();
+ return 0;
+ }
+
+ @Override
+ public void printUsage() {
+ System.err.println("lastmark: Print last log marker.");
+ }
+ }
+
+ /**
+ * Command to print help message
+ */
+ class HelpCmd implements Command {
+ @Override
+ public int runCmd(String[] args) throws Exception {
+ if (args.length == 0) {
+ printShellUsage();
+ return 0;
+ }
+ String cmdName = args[0];
+ Command cmd = commands.get(cmdName);
+ if (null == cmd) {
+ System.err.println("Unknown command " + cmdName);
+ printShellUsage();
+ return -1;
+ }
+ cmd.printUsage();
+ return 0;
+ }
+
+ @Override
+ public void printUsage() {
+ System.err.println("help: Describe the usage of this program or its subcommands.");
+ System.err.println("usage: help [COMMAND]");
+ }
+ }
+
+ final Map<String, Command> commands;
+ {
+ commands = new HashMap<String, Command>();
+ commands.put(CMD_LEDGER, new LedgerCmd());
+ commands.put(CMD_READLOG, new ReadLogCmd());
+ commands.put(CMD_READJOURNAL, new ReadJournalCmd());
+ commands.put(CMD_LASTMARK, new LastMarkCmd());
+ commands.put(CMD_HELP, new HelpCmd());
+ }
+
+ @Override
+ public void setConf(Configuration conf) throws Exception {
+ bkConf.loadConf(conf);
+ journalDirectory = Bookie.getCurrentDirectory(bkConf.getJournalDir());
+ ledgerDirectories = Bookie.getCurrentDirectories(bkConf.getLedgerDirs());
+ formatter = EntryFormatter.newEntryFormatter(bkConf, ENTRY_FORMATTER_CLASS);
+ LOG.info("Using entry formatter " + formatter.getClass().getName());
+ pageSize = bkConf.getPageSize();
+ entriesPerPage = pageSize / 8;
+ }
+
+ private static void printShellUsage() {
+ System.err.println("Usage: BookieShell [-conf configuration] <command>");
+ System.err.println();
+ System.err.println(" ledger [-meta] <ledger_id>");
+ System.err.println(" readlog [-msg] <entry_log_id|entry_log_file_name>");
+ System.err.println(" readjournal [-msg] <journal_id|journal_file_name>");
+ System.err.println(" lastmark");
+ System.err.println(" help");
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length <= 0) {
+ printShellUsage();
+ return -1;
+ }
+ String cmdName = args[0];
+ Command cmd = commands.get(cmdName);
+ if (null == cmd) {
+ System.err.println("ERROR: Unknown command " + cmdName);
+ printShellUsage();
+ return -1;
+ }
+ // prepare new args
+ String[] newArgs = new String[args.length - 1];
+ System.arraycopy(args, 1, newArgs, 0, newArgs.length);
+ return cmd.runCmd(newArgs);
+ }
+
+ public static void main(String argv[]) throws Exception {
+ if (argv.length <= 0) {
+ printShellUsage();
+ System.exit(-1);
+ }
+
+ CompositeConfiguration conf = new CompositeConfiguration();
+ // load configuration
+ if ("-conf".equals(argv[0])) {
+ if (argv.length <= 1) {
+ printShellUsage();
+ System.exit(-1);
+ }
+ conf.addConfiguration(new PropertiesConfiguration(
+ new File(argv[1]).toURI().toURL()));
+
+ String[] newArgv = new String[argv.length - 2];
+ System.arraycopy(argv, 2, newArgv, 0, newArgv.length);
+ argv = newArgv;
+ }
+
+ BookieShell shell = new BookieShell();
+ shell.setConf(conf);
+ int res = shell.run(argv);
+ System.exit(res);
+ }
+
+ ///
+ /// Bookie File Operations
+ ///
+
+ /**
+ * Get the ledger file of a specified ledger.
+ *
+ * @param ledgerId
+ * Ledger Id
+ *
+ * @return file object.
+ */
+ private File getLedgerFile(long ledgerId) {
+ String ledgerName = LedgerCacheImpl.getLedgerName(ledgerId);
+ File lf = null;
+ for (File d : ledgerDirectories) {
+ lf = new File(d, ledgerName);
+ if (lf.exists()) {
+ break;
+ }
+ lf = null;
+ }
+ return lf;
+ }
+
+ /**
+ * Get FileInfo for a specified ledger.
+ *
+ * @param ledgerId
+ * Ledger Id
+ * @return read only file info instance
+ */
+ ReadOnlyFileInfo getFileInfo(long ledgerId) throws IOException {
+ File ledgerFile = getLedgerFile(ledgerId);
+ if (null == ledgerFile) {
+ throw new FileNotFoundException("No index file found for ledger " + ledgerId + ". It may be not flushed yet.");
+ }
+ ReadOnlyFileInfo fi = new ReadOnlyFileInfo(ledgerFile, null);
+ fi.readHeader();
+ return fi;
+ }
+
+ private synchronized void initEntryLogger() throws IOException {
+ if (null == entryLogger) {
+ // provide read only entry logger
+ entryLogger = new ReadOnlyEntryLogger(bkConf);
+ }
+ }
+
+ /**
+ * scan over entry log
+ *
+ * @param logId
+ * Entry Log Id
+ * @param scanner
+ * Entry Log Scanner
+ */
+ protected void scanEntryLog(long logId, EntryLogScanner scanner) throws IOException {
+ initEntryLogger();
+ entryLogger.scanEntryLog(logId, scanner);
+ }
+
+ private synchronized void initJournal() throws IOException {
+ if (null == journal) {
+ journal = new Journal(bkConf);
+ }
+ }
+
+ /**
+ * Scan journal file
+ *
+ * @param journalId
+ * Journal File Id
+ * @param scanner
+ * Journal File Scanner
+ */
+ protected void scanJournal(long journalId, JournalScanner scanner) throws IOException {
+ initJournal();
+ journal.scanJournal(journalId, 0L, scanner);
+ }
+
+ ///
+ /// Bookie Shell Commands
+ ///
+
+ /**
+ * Read ledger meta
+ *
+ * @param ledgerId
+ * Ledger Id
+ */
+ protected void readLedgerMeta(long ledgerId) throws Exception {
+ System.out.println("===== LEDGER: " + ledgerId + " =====");
+ FileInfo fi = getFileInfo(ledgerId);
+ byte[] masterKey = fi.getMasterKey();
+ if (null == masterKey) {
+ System.out.println("master key : NULL");
+ } else {
+ System.out.println("master key : " + bytes2Hex(fi.getMasterKey()));
+ }
+ long size = fi.size();
+ if (size % 8 == 0) {
+ System.out.println("size : " + size);
+ } else {
+ System.out.println("size : " + size + " (not aligned with 8, may be corrupted or under flushing now)");
+ }
+ System.out.println("entries : " + (size / 8));
+ }
+
+ /**
+ * Read ledger index entires
+ *
+ * @param ledgerId
+ * Ledger Id
+ * @throws IOException
+ */
+ protected void readLedgerIndexEntries(long ledgerId) throws IOException {
+ System.out.println("===== LEDGER: " + ledgerId + " =====");
+ FileInfo fi = getFileInfo(ledgerId);
+ long size = fi.size();
+ System.out.println("size : " + size);
+ long curSize = 0;
+ long curEntry = 0;
+ LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+ lep.usePage();
+ try {
+ while (curSize < size) {
+ lep.setLedger(ledgerId);
+ lep.setFirstEntry(curEntry);
+ lep.readPage(fi);
+
+ // process a page
+ for (int i=0; i<entriesPerPage; i++) {
+ long offset = lep.getOffset(i * 8);
+ if (0 == offset) {
+ System.out.println("entry " + curEntry + "\t:\tN/A");
+ } else {
+ long entryLogId = offset >> 32L;
+ long pos = offset & 0xffffffffL;
+ System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")");
+ }
+ ++curEntry;
+ }
+
+ curSize += pageSize;
+ }
+ } catch (IOException ie) {
+ LOG.error("Failed to read index page : ", ie);
+ if (curSize + pageSize < size) {
+ System.out.println("Failed to read index page @ " + curSize + ", the index file may be corrupted : " + ie.getMessage());
+ } else {
+ System.out.println("Failed to read last index page @ " + curSize
+ + ", the index file may be corrupted or last index page is not fully flushed yet : " + ie.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Scan over an entry log file.
+ *
+ * @param logId
+ * Entry Log File id.
+ * @param printMsg
+ * Whether printing the entry data.
+ */
+ protected void scanEntryLog(long logId, final boolean printMsg) throws Exception {
+ System.out.println("Scan entry log " + logId + " (" + Long.toHexString(logId) + ".log)");
+ scanEntryLog(logId, new EntryLogScanner() {
+ @Override
+ public boolean accept(long ledgerId) {
+ return true;
+ }
+ @Override
+ public void process(long ledgerId, long startPos, ByteBuffer entry) {
+ formatEntry(startPos, entry, printMsg);
+ }
+ });
+ }
+
+ /**
+ * Scan a journal file
+ *
+ * @param journalId
+ * Journal File Id
+ * @param printMsg
+ * Whether printing the entry data.
+ */
+ protected void scanJournal(long journalId, final boolean printMsg) throws Exception {
+ System.out.println("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)");
+ scanJournal(journalId, new JournalScanner() {
+ boolean printJournalVersion = false;
+ @Override
+ public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
+ if (!printJournalVersion) {
+ System.out.println("Journal Version : " + journalVersion);
+ printJournalVersion = true;
+ }
+ formatEntry(offset, entry, printMsg);
+ }
+ });
+ }
+
+ /**
+ * Print last log mark
+ */
+ protected void printLastLogMark() throws IOException {
+ initJournal();
+ LastLogMark lastLogMark = journal.getLastLogMark();
+ System.out.println("LastLogMark: Journal Id - " + lastLogMark.txnLogId + "("
+ + Long.toHexString(lastLogMark.txnLogId) + ".txn), Pos - "
+ + lastLogMark.txnLogPosition);
+ }
+
+ /**
+ * Format the message into a readable format.
+ *
+ * @param pos
+ * File offset of the message stored in entry log file
+ * @param recBuff
+ * Entry Data
+ * @param printMsg
+ * Whether printing the message body
+ */
+ private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
+ long ledgerId = recBuff.getLong();
+ long entryId = recBuff.getLong();
+ int entrySize = recBuff.limit();
+
+ System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
+ + ", ByteOffset=" + pos + ", EntrySize=" + entrySize + " ---------");
+ if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
+ int masterKeyLen = recBuff.getInt();
+ byte[] masterKey = new byte[masterKeyLen];
+ recBuff.get(masterKey);
+ System.out.println("Type: META");
+ System.out.println("MasterKey: " + bytes2Hex(masterKey));
+ System.out.println();
+ return;
+ }
+ // process a data entry
+ long lastAddConfirmed = recBuff.getLong();
+ long length = recBuff.getLong();
+ System.out.println("Type: DATA");
+ System.out.println("LastConfirmed: " + lastAddConfirmed);
+ if (!printMsg) {
+ System.out.println();
+ return;
+ }
+ // skip digest checking
+ recBuff.position(32 + 8);
+ System.out.println("Data:");
+ System.out.println();
+ try {
+ byte[] ret = new byte[recBuff.remaining()];
+ recBuff.get(ret);
+ formatter.formatEntry(ret);
+ } catch (Exception e) {
+ System.out.println("N/A. Corrupted.");
+ }
+ System.out.println();
+ }
+
+ static String bytes2Hex(byte[] data) {
+ StringBuilder sb = new StringBuilder(data.length * 2);
+ Formatter formatter = new Formatter(sb);
+ for (byte b : data) {
+ formatter.format("%02x", b);
+ }
+ return sb.toString();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Wed Jun 13 14:57:54 2012
@@ -91,11 +91,13 @@ public class EntryLogger {
*
* @param ledgerId
* Ledger ID.
+ * @param offset
+ * File offset of this entry.
* @param entry
* Entry ByteBuffer
* @throws IOException
*/
- public void process(long ledgerId, ByteBuffer entry) throws IOException;
+ public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException;
}
/**
@@ -125,7 +127,8 @@ public class EntryLogger {
logId = lastLogId;
}
}
- createNewLog();
+
+ initialize();
}
/**
@@ -137,10 +140,15 @@ public class EntryLogger {
return logId;
}
+ protected void initialize() throws IOException {
+ // create a new log to write
+ createNewLog();
+ }
+
/**
* Creates a new log file
*/
- private void createNewLog() throws IOException {
+ void createNewLog() throws IOException {
List<File> list = Arrays.asList(dirs);
Collections.shuffle(list);
if (logChannel != null) {
@@ -416,6 +424,7 @@ public class EntryLogger {
if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
throw new IOException("Short read for entry size from entrylog " + entryLogId);
}
+ long offset = pos;
pos += 4;
sizeBuff.flip();
int entrySize = sizeBuff.getInt();
@@ -446,7 +455,7 @@ public class EntryLogger {
}
buff.flip();
// process the entry
- scanner.process(lid, buff);
+ scanner.process(lid, offset, buff);
// Advance position to the next entry
pos += entrySize;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Wed Jun 13 14:57:54 2012
@@ -68,10 +68,14 @@ class FileInfo {
private int useCount;
private boolean isClosed;
+ // file access mode
+ protected String mode;
+
public FileInfo(File lf, byte[] masterKey) throws IOException {
this.lf = lf;
this.masterKey = masterKey;
+ mode = "rw";
}
synchronized public void readHeader() throws IOException {
@@ -80,7 +84,7 @@ class FileInfo {
return;
}
- fc = new RandomAccessFile(lf, "rw").getChannel();
+ fc = new RandomAccessFile(lf, mode).getChannel();
size = fc.size();
// avoid hang on reading partial index
@@ -122,7 +126,7 @@ class FileInfo {
if (create) {
// delayed the creation of parents directories
checkParents(lf);
- fc = new RandomAccessFile(lf, "rw").getChannel();
+ fc = new RandomAccessFile(lf, mode).getChannel();
size = fc.size();
if (size == 0) {
writeHeader();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Wed Jun 13 14:57:54 2012
@@ -100,8 +100,9 @@ public class GarbageCollectorThread exte
}
@Override
- public void process(long ledgerId, ByteBuffer entry) throws IOException {
- scanner.process(ledgerId, entry);
+ public void process(long ledgerId, long offset, ByteBuffer entry)
+ throws IOException {
+ scanner.process(ledgerId, offset, entry);
}
}
@@ -442,7 +443,7 @@ public class GarbageCollectorThread exte
return true;
}
@Override
- public void process(long ledgerId, ByteBuffer entry) {
+ public void process(long ledgerId, long offset, ByteBuffer entry) {
// add new entry size of a ledger to entry log meta
meta.addLedgerSize(ledgerId, entry.limit() + 4);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Wed Jun 13 14:57:54 2012
@@ -178,10 +178,10 @@ class InterleavedLedgerStorage implement
}
@Override
- public void process(long ledgerId, ByteBuffer buffer)
+ public void process(long ledgerId, long offset, ByteBuffer buffer)
throws IOException {
addEntry(buffer);
}
}
-}
\ No newline at end of file
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Read Only Entry Logger
+ */
+public class ReadOnlyEntryLogger extends EntryLogger {
+
+ public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
+ super(conf);
+ }
+
+ @Override
+ protected void initialize() throws IOException {
+ // do nothing for read only entry logger
+ }
+
+ @Override
+ void createNewLog() throws IOException {
+ throw new IOException("Can't create new entry log using a readonly entry logger.");
+ }
+
+ @Override
+ protected boolean removeEntryLog(long entryLogId) {
+ // can't remove entry log in readonly mode
+ return false;
+ }
+
+ @Override
+ synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ throw new IOException("Can't add entry to a readonly entry logger.");
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a readonly file info.
+ */
+class ReadOnlyFileInfo extends FileInfo {
+
+ public ReadOnlyFileInfo(File lf, byte[] masterKey) throws IOException {
+ super(lf, masterKey);
+ mode = "r";
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Jun 13 14:57:54 2012
@@ -65,6 +65,16 @@ public abstract class AbstractConfigurat
}
/**
+ * Load configuration from other configuration object
+ *
+ * @param otherConf
+ * Other configuration object
+ */
+ public void loadConf(Configuration otherConf) {
+ addConfiguration(otherConf);
+ }
+
+ /**
* Set Ledger Manager Type.
*
* @param lmType
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.IOException;
+
+import org.apache.commons.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Formatter to format an entry
+ */
+public abstract class EntryFormatter {
+
+ static Logger LOG = LoggerFactory.getLogger(EntryFormatter.class);
+
+ protected Configuration conf;
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Format an entry into a readable format
+ *
+ * @param data
+ * Data Payload
+ */
+ public abstract void formatEntry(byte[] data);
+
+ /**
+ * Format an entry from a string into a readable format
+ *
+ * @param input
+ * Input Stream
+ */
+ public abstract void formatEntry(java.io.InputStream input);
+
+ public final static EntryFormatter STRING_FORMATTER = new StringEntryFormatter();
+
+ public static EntryFormatter newEntryFormatter(Configuration conf, String clsProperty) {
+ String cls = conf.getString(clsProperty, StringEntryFormatter.class.getName());
+ ClassLoader classLoader = EntryFormatter.class.getClassLoader();
+ EntryFormatter formatter;
+ try {
+ Class aCls = classLoader.loadClass(cls);
+ formatter = (EntryFormatter) aCls.newInstance();
+ formatter.setConf(conf);
+ } catch (Exception e) {
+ LOG.warn("No formatter class found : " + cls, e);
+ LOG.warn("Using Default String Formatter.");
+ formatter = STRING_FORMATTER;
+ }
+ return formatter;
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.IOException;
+
+import org.apache.commons.configuration.Configuration;
+
+import com.google.protobuf.ByteString;
+
+public class StringEntryFormatter extends EntryFormatter {
+ @Override
+ public void formatEntry(byte[] data) {
+ System.out.println(ByteString.copyFrom(data).toStringUtf8());
+ }
+
+ @Override
+ public void formatEntry(java.io.InputStream input) {
+ try {
+ byte[] data = new byte[input.available()];
+ input.read(data, 0, data.length);
+ formatEntry(data);
+ } catch (IOException ie) {
+ System.out.println("Warn: Unreadable entry : " + ie.getMessage());
+ }
+ }
+
+};
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * A tool interface that supports handling of generic command-line options.
+ */
+public interface Tool {
+ /**
+ * Exectue the command with given arguments
+ *
+ * @param args command specific arguments
+ * @return exit code.
+ * @throws Exception
+ */
+ public int run(String[] args) throws Exception;
+
+ /**
+ * Passe a configuration object to the tool.
+ *
+ * @param conf configuration object
+ * @throws Exception
+ */
+ public void setConf(Configuration conf) throws Exception;
+}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hedwig.data;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Format a pub sub message into a readable format.
+ */
+public class MessageFormatter extends EntryFormatter {
+ static Logger logger = LoggerFactory.getLogger(MessageFormatter.class);
+
+ static final String MESSAGE_PAYLOAD_FORMATTER_CLASS = "message_payload_formatter_class";
+
+ EntryFormatter dataFormatter = EntryFormatter.STRING_FORMATTER;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ dataFormatter = EntryFormatter.newEntryFormatter(conf, MESSAGE_PAYLOAD_FORMATTER_CLASS);
+ }
+
+ @Override
+ public void formatEntry(java.io.InputStream input) {
+ Message message;
+ try {
+ message = Message.parseFrom(input);
+ } catch (IOException e) {
+ System.out.println("WARN: Unreadable message found\n");
+ EntryFormatter.STRING_FORMATTER.formatEntry(input);
+ return;
+ }
+ formatMessage(message);
+ }
+
+ @Override
+ public void formatEntry(byte[] data) {
+ Message message;
+ try {
+ message = Message.parseFrom(data);
+ } catch (IOException e) {
+ System.out.println("WARN: Unreadable message found\n");
+ EntryFormatter.STRING_FORMATTER.formatEntry(data);
+ return;
+ }
+ formatMessage(message);
+ }
+
+ void formatMessage(Message message) {
+ // print msg id
+ String msgId;
+ if (!message.hasMsgId()) {
+ msgId = "N/A";
+ } else {
+ MessageSeqId seqId = message.getMsgId();
+ StringBuilder idBuilder = new StringBuilder();
+ if (seqId.hasLocalComponent()) {
+ idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
+ } else {
+ List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
+ int i = 0, numRegions = remoteIds.size();
+ idBuilder.append("REMOTE(");
+ for (RegionSpecificSeqId rssid : remoteIds) {
+ idBuilder.append(rssid.getRegion().toStringUtf8());
+ idBuilder.append("[");
+ idBuilder.append(rssid.getSeqId());
+ idBuilder.append("]");
+ ++i;
+ if (i < numRegions) {
+ idBuilder.append(",");
+ }
+ }
+ idBuilder.append(")");
+ }
+ msgId = idBuilder.toString();
+ }
+ System.out.println("****** MSGID=" + msgId + " ******");
+ System.out.println("MessageId: " + msgId);
+ // print source region
+ if (message.hasSrcRegion()) {
+ System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8());
+ } else {
+ System.out.println("SrcRegion: N/A");
+ }
+ // print message body
+ if (message.hasBody()) {
+ System.out.println("Body:");
+ dataFormatter.formatEntry(message.getBody().toByteArray());
+ } else {
+ System.out.println("Body: N/A");
+ }
+ System.out.println();
+ }
+}