You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/06/13 16:57:55 UTC

svn commit: r1349896 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/bin/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/jav...

Author: ivank
Date: Wed Jun 13 14:57:54 2012
New Revision: 1349896

URL: http://svn.apache.org/viewvc?rev=1349896&view=rev
Log:
BOOKKEEPER-183: Provide tools to read/check data files in bookie server (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jun 13 14:57:54 2012
@@ -12,6 +12,10 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-289: mvn clean doesn't remove test output files (sijie via ivank)
 
+      bookkeeper-server:
+
+        BOOKKEEPER-183: Provide tools to read/check data files in bookie server (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-274: Hedwig cpp client library should not link to cppunit which is just used for test. (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/bin/bookkeeper Wed Jun 13 14:57:54 2012
@@ -78,6 +78,7 @@ where command is one of:
     bookie              Run a bookie server
     localbookie <n>     Run a test ensemble of <n> bookies locally
     upgrade             Upgrade bookie filesystem
+    shell               Run admin shell to read/check bookie files
     help                This help message
 
 or command is the full name of a class with a defined main() method.
@@ -87,6 +88,7 @@ Environment variables:
    BOOKIE_CONF            Configuration file (default: $DEFAULT_CONF)
    BOOKIE_EXTRA_OPTS      Extra options to be passed to the jvm
    BOOKIE_EXTRA_CLASSPATH Add extra paths to the bookkeeper classpath
+   ENTRY_FORMATTER_CLASS  Entry formatter class to format entries.
 
 These variable can also be set in conf/bkenv.sh
 EOF
@@ -155,6 +157,9 @@ elif [ $COMMAND == "localbookie" ]; then
     exec java $OPTS $JMX_ARGS org.apache.bookkeeper.util.LocalBookKeeper $NUMBER $BOOKIE_CONF $@
 elif [ $COMMAND == "upgrade" ]; then
     exec java $OPTS org.apache.bookkeeper.bookie.FileSystemUpgrade --conf $BOOKIE_CONF $@
+elif [ $COMMAND == "shell" ]; then
+    ENTRY_FORMATTER_ARG="-DentryFormatterClass=${ENTRY_FORMATTER_CLASS:-org.apache.bookkeeper.util.StringEntryFormatter}"
+    exec java $OPTS $ENTRY_FORMATTER_ARG org.apache.bookkeeper.bookie.BookieShell -conf $BOOKIE_CONF $@
 elif [ $COMMAND == "help" ]; then
     bookkeeper_help;
 else

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Wed Jun 13 14:57:54 2012
@@ -32,6 +32,12 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.3.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.8.1</version>

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.Tool;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bookie Shell to read/check bookie files.
+ */
+public class BookieShell implements Tool {
+
+    static final Logger LOG = LoggerFactory.getLogger(BookieShell.class);
+
+    static final String ENTRY_FORMATTER_CLASS = "entryFormatterClass";
+
+    static final String CMD_LEDGER = "ledger";
+    static final String CMD_READLOG = "readlog";
+    static final String CMD_READJOURNAL = "readjournal";
+    static final String CMD_LASTMARK = "lastmark";
+    static final String CMD_HELP = "help";
+
+    final ServerConfiguration bkConf = new ServerConfiguration();
+    File[] ledgerDirectories;
+    File journalDirectory;
+
+    EntryLogger entryLogger = null;
+    Journal journal = null;
+    EntryFormatter formatter;
+
+    int pageSize;
+    int entriesPerPage;
+
+    interface Command {
+        public int runCmd(String[] args) throws Exception;
+        public void printUsage();
+    }
+
+    abstract class MyCommand implements Command {
+        abstract Options getOptions();
+        abstract String getDescription();
+        abstract String getUsage();
+        abstract int runCmd(CommandLine cmdLine) throws Exception;
+
+        String cmdName;
+
+        MyCommand(String cmdName) {
+            this.cmdName = cmdName;
+        }
+
+        @Override
+        public int runCmd(String[] args) throws Exception {
+            try {
+                BasicParser parser = new BasicParser();
+                CommandLine cmdLine = parser.parse(getOptions(), args);
+                return runCmd(cmdLine);
+            } catch (ParseException e) {
+                LOG.error("Error parsing command line arguments : ", e);
+                printUsage();
+                return -1;
+            }
+        }
+
+        @Override
+        public void printUsage() {
+            HelpFormatter hf = new HelpFormatter();
+            System.err.println(cmdName + ": " + getDescription());
+            hf.printHelp(getUsage(), getOptions());
+        }
+    }
+
+    /**
+     * Ledger Command Handles ledger related operations
+     */
+    class LedgerCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        LedgerCmd() {
+            super(CMD_LEDGER);
+            lOpts.addOption("m", "meta", false, "Print meta information");
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            String[] leftArgs = cmdLine.getArgs();
+            if (leftArgs.length <= 0) {
+                System.err.println("ERROR: missing ledger id");
+                printUsage();
+                return -1;
+            }
+
+            boolean printMeta = false;
+            if (cmdLine.hasOption("m")) {
+                printMeta = true;
+            }
+            long ledgerId;
+            try {
+                ledgerId = Long.parseLong(leftArgs[0]);
+            } catch (NumberFormatException nfe) {
+                System.err.println("ERROR: invalid ledger id " + leftArgs[0]);
+                printUsage();
+                return -1;
+            }
+            if (printMeta) {
+                // print meta
+                readLedgerMeta(ledgerId);
+            }
+            // dump ledger info
+            readLedgerIndexEntries(ledgerId);
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "Dump ledger index entries into readable format.";
+        }
+
+        @Override
+        String getUsage() {
+            return "ledger [-m] <ledger_id>";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+    }
+
+    /**
+     * Command to read entry log files.
+     */
+    class ReadLogCmd extends MyCommand {
+        Options rlOpts = new Options();
+
+        ReadLogCmd() {
+            super(CMD_READLOG);
+            rlOpts.addOption("m", "msg", false, "Print message body");
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            String[] leftArgs = cmdLine.getArgs();
+            if (leftArgs.length <= 0) {
+                System.err.println("ERROR: missing entry log id or entry log file name");
+                printUsage();
+                return -1;
+            }
+
+            boolean printMsg = false;
+            if (cmdLine.hasOption("m")) {
+                printMsg = true;
+            }
+            long logId;
+            try {
+                logId = Long.parseLong(leftArgs[0]);
+            } catch (NumberFormatException nfe) {
+                // not a entry log id
+                File f = new File(leftArgs[0]);
+                String name = f.getName();
+                if (!name.endsWith(".log")) {
+                    // not a log file
+                    System.err.println("ERROR: invalid entry log file name " + leftArgs[0]);
+                    printUsage();
+                    return -1;
+                }
+                String idString = name.split("\\.")[0];
+                logId = Long.parseLong(idString, 16);
+            }
+            // scan entry log
+            scanEntryLog(logId, printMsg);
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "Scan an entry file and format the entries into readable format.";
+        }
+
+        @Override
+        String getUsage() {
+            return "readlog [-m] <entry_log_id | entry_log_file_name>";
+        }
+
+        @Override
+        Options getOptions() {
+            return rlOpts;
+        }
+    }
+
+    /**
+     * Command to read journal files
+     */
+    class ReadJournalCmd extends MyCommand {
+        Options rjOpts = new Options();
+
+        ReadJournalCmd() {
+            super(CMD_READJOURNAL);
+            rjOpts.addOption("m", "msg", false, "Print message body");
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            String[] leftArgs = cmdLine.getArgs();
+            if (leftArgs.length <= 0) {
+                System.err.println("ERROR: missing journal id or journal file name");
+                printUsage();
+                return -1;
+            }
+
+            boolean printMsg = false;
+            if (cmdLine.hasOption("m")) {
+                printMsg = true;
+            }
+            long journalId;
+            try {
+                journalId = Long.parseLong(leftArgs[0]);
+            } catch (NumberFormatException nfe) {
+                // not a journal id
+                File f = new File(leftArgs[0]);
+                String name = f.getName();
+                if (!name.endsWith(".txn")) {
+                    // not a journal file
+                    System.err.println("ERROR: invalid journal file name " + leftArgs[0]);
+                    printUsage();
+                    return -1;
+                }
+                String idString = name.split("\\.")[0];
+                journalId = Long.parseLong(idString, 16);
+            }
+            // scan journal
+            scanJournal(journalId, printMsg);
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "Scan a journal file and format the entries into readable format.";
+        }
+
+        @Override
+        String getUsage() {
+            return "readjournal [-m] <journal_id | journal_file_name>";
+        }
+
+        @Override
+        Options getOptions() {
+            return rjOpts;
+        }
+    }
+
+    /**
+     * Command to print last log mark
+     */
+    class LastMarkCmd implements Command {
+        @Override
+        public int runCmd(String[] args) throws Exception {
+            printLastLogMark();
+            return 0;
+        }
+
+        @Override
+        public void printUsage() {
+            System.err.println("lastmark: Print last log marker.");
+        }
+    }
+
+    /**
+     * Command to print help message
+     */
+    class HelpCmd implements Command {
+        @Override
+        public int runCmd(String[] args) throws Exception {
+            if (args.length == 0) {
+                printShellUsage();
+                return 0;
+            }
+            String cmdName = args[0];
+            Command cmd = commands.get(cmdName);
+            if (null == cmd) {
+                System.err.println("Unknown command " + cmdName);
+                printShellUsage();
+                return -1;
+            }
+            cmd.printUsage();
+            return 0;
+        }
+
+        @Override
+        public void printUsage() {
+            System.err.println("help: Describe the usage of this program or its subcommands.");
+            System.err.println("usage: help [COMMAND]");
+        }
+    }
+
+    final Map<String, Command> commands;
+    {
+        commands = new HashMap<String, Command>();
+        commands.put(CMD_LEDGER, new LedgerCmd());
+        commands.put(CMD_READLOG, new ReadLogCmd());
+        commands.put(CMD_READJOURNAL, new ReadJournalCmd());
+        commands.put(CMD_LASTMARK, new LastMarkCmd());
+        commands.put(CMD_HELP, new HelpCmd());
+    }
+
+    @Override
+    public void setConf(Configuration conf) throws Exception {
+        bkConf.loadConf(conf);
+        journalDirectory = Bookie.getCurrentDirectory(bkConf.getJournalDir());
+        ledgerDirectories = Bookie.getCurrentDirectories(bkConf.getLedgerDirs());
+        formatter = EntryFormatter.newEntryFormatter(bkConf, ENTRY_FORMATTER_CLASS);
+        LOG.info("Using entry formatter " + formatter.getClass().getName());
+        pageSize = bkConf.getPageSize();
+        entriesPerPage = pageSize / 8;
+    }
+
+    private static void printShellUsage() {
+        System.err.println("Usage: BookieShell [-conf configuration] <command>");
+        System.err.println();
+        System.err.println("       ledger      [-meta] <ledger_id>");
+        System.err.println("       readlog     [-msg] <entry_log_id|entry_log_file_name>");
+        System.err.println("       readjournal [-msg] <journal_id|journal_file_name>");
+        System.err.println("       lastmark");
+        System.err.println("       help");
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        if (args.length <= 0) {
+            printShellUsage();
+            return -1;
+        }
+        String cmdName = args[0];
+        Command cmd = commands.get(cmdName);
+        if (null == cmd) {
+            System.err.println("ERROR: Unknown command " + cmdName);
+            printShellUsage();
+            return -1;
+        }
+        // prepare new args
+        String[] newArgs = new String[args.length - 1];
+        System.arraycopy(args, 1, newArgs, 0, newArgs.length);
+        return cmd.runCmd(newArgs);
+    }
+
+    public static void main(String argv[]) throws Exception {
+        if (argv.length <= 0) {
+            printShellUsage();
+            System.exit(-1);
+        }
+
+        CompositeConfiguration conf = new CompositeConfiguration();
+        // load configuration
+        if ("-conf".equals(argv[0])) {
+            if (argv.length <= 1) {
+                printShellUsage();
+                System.exit(-1);
+            }
+            conf.addConfiguration(new PropertiesConfiguration(
+                                  new File(argv[1]).toURI().toURL()));
+
+            String[] newArgv = new String[argv.length - 2];
+            System.arraycopy(argv, 2, newArgv, 0, newArgv.length);
+            argv = newArgv;
+        }
+
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(argv);
+        System.exit(res);
+    }
+
+    ///
+    /// Bookie File Operations
+    ///
+
+    /**
+     * Get the ledger file of a specified ledger.
+     *
+     * @param ledgerId
+     *          Ledger Id
+     *
+     * @return file object.
+     */
+    private File getLedgerFile(long ledgerId) {
+        String ledgerName = LedgerCacheImpl.getLedgerName(ledgerId);
+        File lf = null;
+        for (File d : ledgerDirectories) {
+            lf = new File(d, ledgerName);
+            if (lf.exists()) {
+                break;
+            }
+            lf = null;
+        }
+        return lf;
+    }
+
+    /**
+     * Get FileInfo for a specified ledger.
+     *
+     * @param ledgerId
+     *          Ledger Id
+     * @return read only file info instance
+     */
+    ReadOnlyFileInfo getFileInfo(long ledgerId) throws IOException {
+        File ledgerFile = getLedgerFile(ledgerId);
+        if (null == ledgerFile) {
+            throw new FileNotFoundException("No index file found for ledger " + ledgerId + ". It may be not flushed yet.");
+        }
+        ReadOnlyFileInfo fi = new ReadOnlyFileInfo(ledgerFile, null);
+        fi.readHeader();
+        return fi;
+    }
+
+    private synchronized void initEntryLogger() throws IOException {
+        if (null == entryLogger) {
+            // provide read only entry logger
+            entryLogger = new ReadOnlyEntryLogger(bkConf);
+        }
+    }
+
+    /**
+     * scan over entry log
+     *
+     * @param logId
+     *          Entry Log Id
+     * @param scanner
+     *          Entry Log Scanner
+     */
+    protected void scanEntryLog(long logId, EntryLogScanner scanner) throws IOException {
+        initEntryLogger();
+        entryLogger.scanEntryLog(logId, scanner);
+    }
+
+    private synchronized void initJournal() throws IOException {
+        if (null == journal) {
+            journal = new Journal(bkConf);
+        }
+    }
+
+    /**
+     * Scan journal file
+     *
+     * @param journalId
+     *          Journal File Id
+     * @param scanner
+     *          Journal File Scanner
+     */
+    protected void scanJournal(long journalId, JournalScanner scanner) throws IOException {
+        initJournal();
+        journal.scanJournal(journalId, 0L, scanner);
+    }
+
+    ///
+    /// Bookie Shell Commands
+    ///
+
+    /**
+     * Read ledger meta
+     *
+     * @param ledgerId
+     *          Ledger Id
+     */
+    protected void readLedgerMeta(long ledgerId) throws Exception {
+        System.out.println("===== LEDGER: " + ledgerId + " =====");
+        FileInfo fi = getFileInfo(ledgerId);
+        byte[] masterKey = fi.getMasterKey();
+        if (null == masterKey) {
+            System.out.println("master key  : NULL");
+        } else {
+            System.out.println("master key  : " + bytes2Hex(fi.getMasterKey()));
+        }
+        long size = fi.size();
+        if (size % 8 == 0) {
+            System.out.println("size        : " + size);
+        } else {
+            System.out.println("size : " + size + " (not aligned with 8, may be corrupted or under flushing now)");
+        }
+        System.out.println("entries     : " + (size / 8));
+    }
+
+    /**
+     * Read ledger index entires
+     *
+     * @param ledgerId
+     *          Ledger Id
+     * @throws IOException
+     */
+    protected void readLedgerIndexEntries(long ledgerId) throws IOException {
+        System.out.println("===== LEDGER: " + ledgerId + " =====");
+        FileInfo fi = getFileInfo(ledgerId);
+        long size = fi.size();
+        System.out.println("size        : " + size);
+        long curSize = 0;
+        long curEntry = 0;
+        LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+        lep.usePage();
+        try {
+            while (curSize < size) {
+                lep.setLedger(ledgerId);
+                lep.setFirstEntry(curEntry);
+                lep.readPage(fi);
+
+                // process a page
+                for (int i=0; i<entriesPerPage; i++) {
+                    long offset = lep.getOffset(i * 8);
+                    if (0 == offset) {
+                        System.out.println("entry " + curEntry + "\t:\tN/A");
+                    } else {
+                        long entryLogId = offset >> 32L;
+                        long pos = offset & 0xffffffffL;
+                        System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")");
+                    }
+                    ++curEntry;
+                }
+
+                curSize += pageSize;
+            }
+        } catch (IOException ie) {
+            LOG.error("Failed to read index page : ", ie);
+            if (curSize + pageSize < size) {
+                System.out.println("Failed to read index page @ " + curSize + ", the index file may be corrupted : " + ie.getMessage());
+            } else {
+                System.out.println("Failed to read last index page @ " + curSize
+                                 + ", the index file may be corrupted or last index page is not fully flushed yet : " + ie.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Scan over an entry log file.
+     *
+     * @param logId
+     *          Entry Log File id.
+     * @param printMsg
+     *          Whether printing the entry data.
+     */
+    protected void scanEntryLog(long logId, final boolean printMsg) throws Exception {
+        System.out.println("Scan entry log " + logId + " (" + Long.toHexString(logId) + ".log)");
+        scanEntryLog(logId, new EntryLogScanner() {
+            @Override
+            public boolean accept(long ledgerId) {
+                return true;
+            }
+            @Override
+            public void process(long ledgerId, long startPos, ByteBuffer entry) {
+                formatEntry(startPos, entry, printMsg);
+            }
+        });
+    }
+
+    /**
+     * Scan a journal file
+     *
+     * @param journalId
+     *          Journal File Id
+     * @param printMsg
+     *          Whether printing the entry data.
+     */
+    protected void scanJournal(long journalId, final boolean printMsg) throws Exception {
+        System.out.println("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)");
+        scanJournal(journalId, new JournalScanner() {
+            boolean printJournalVersion = false;
+            @Override
+            public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
+                if (!printJournalVersion) {
+                    System.out.println("Journal Version : " + journalVersion);
+                    printJournalVersion = true;
+                }
+                formatEntry(offset, entry, printMsg);
+            }
+        });
+    }
+
+    /**
+     * Print last log mark
+     */
+    protected void printLastLogMark() throws IOException {
+        initJournal();
+        LastLogMark lastLogMark = journal.getLastLogMark();
+        System.out.println("LastLogMark: Journal Id - " + lastLogMark.txnLogId + "("
+                         + Long.toHexString(lastLogMark.txnLogId) + ".txn), Pos - "
+                         + lastLogMark.txnLogPosition);
+    }
+
+    /**
+     * Format the message into a readable format.
+     *
+     * @param pos
+     *          File offset of the message stored in entry log file
+     * @param recBuff
+     *          Entry Data
+     * @param printMsg
+     *          Whether printing the message body
+     */
+    private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
+        long ledgerId = recBuff.getLong();
+        long entryId = recBuff.getLong();
+        int entrySize = recBuff.limit();
+
+        System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
+                         + ", ByteOffset=" + pos + ", EntrySize=" + entrySize + " ---------");
+        if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
+            int masterKeyLen = recBuff.getInt();
+            byte[] masterKey = new byte[masterKeyLen];
+            recBuff.get(masterKey);
+            System.out.println("Type:           META");
+            System.out.println("MasterKey:      " + bytes2Hex(masterKey));
+            System.out.println();
+            return;
+        }
+        // process a data entry
+        long lastAddConfirmed = recBuff.getLong();
+        long length = recBuff.getLong();
+        System.out.println("Type:           DATA");
+        System.out.println("LastConfirmed:  " + lastAddConfirmed);
+        if (!printMsg) {
+            System.out.println();
+            return;
+        }
+        // skip digest checking
+        recBuff.position(32 + 8);
+        System.out.println("Data:");
+        System.out.println();
+        try {
+            byte[] ret = new byte[recBuff.remaining()];
+            recBuff.get(ret);
+            formatter.formatEntry(ret);
+        } catch (Exception e) {
+            System.out.println("N/A. Corrupted.");
+        }
+        System.out.println();
+    }
+
+    static String bytes2Hex(byte[] data) {
+        StringBuilder sb = new StringBuilder(data.length * 2);
+        Formatter formatter = new Formatter(sb);
+        for (byte b : data) {
+            formatter.format("%02x", b);
+        }
+        return sb.toString();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Wed Jun 13 14:57:54 2012
@@ -91,11 +91,13 @@ public class EntryLogger {
          *
          * @param ledgerId
          *          Ledger ID.
+         * @param offset
+         *          File offset of this entry.
          * @param entry
          *          Entry ByteBuffer
          * @throws IOException
          */
-        public void process(long ledgerId, ByteBuffer entry) throws IOException;
+        public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException;
     }
 
     /**
@@ -125,7 +127,8 @@ public class EntryLogger {
                 logId = lastLogId;
             }
         }
-        createNewLog();
+
+        initialize();
     }
 
     /**
@@ -137,10 +140,15 @@ public class EntryLogger {
         return logId;
     }
 
+    protected void initialize() throws IOException {
+        // create a new log to write
+        createNewLog();
+    }
+
     /**
      * Creates a new log file
      */
-    private void createNewLog() throws IOException {
+    void createNewLog() throws IOException {
         List<File> list = Arrays.asList(dirs);
         Collections.shuffle(list);
         if (logChannel != null) {
@@ -416,6 +424,7 @@ public class EntryLogger {
             if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
                 throw new IOException("Short read for entry size from entrylog " + entryLogId);
             }
+            long offset = pos;
             pos += 4;
             sizeBuff.flip();
             int entrySize = sizeBuff.getInt();
@@ -446,7 +455,7 @@ public class EntryLogger {
             }
             buff.flip();
             // process the entry
-            scanner.process(lid, buff);
+            scanner.process(lid, offset, buff);
             // Advance position to the next entry
             pos += entrySize;
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Wed Jun 13 14:57:54 2012
@@ -68,10 +68,14 @@ class FileInfo {
     private int useCount;
     private boolean isClosed;
 
+    // file access mode
+    protected String mode;
+
     public FileInfo(File lf, byte[] masterKey) throws IOException {
         this.lf = lf;
 
         this.masterKey = masterKey;
+        mode = "rw";
     }
 
     synchronized public void readHeader() throws IOException {
@@ -80,7 +84,7 @@ class FileInfo {
                 return;
             }
 
-            fc = new RandomAccessFile(lf, "rw").getChannel();
+            fc = new RandomAccessFile(lf, mode).getChannel();
             size = fc.size();
 
             // avoid hang on reading partial index
@@ -122,7 +126,7 @@ class FileInfo {
             if (create) {
                 // delayed the creation of parents directories
                 checkParents(lf);
-                fc = new RandomAccessFile(lf, "rw").getChannel();
+                fc = new RandomAccessFile(lf, mode).getChannel();
                 size = fc.size();
                 if (size == 0) {
                     writeHeader();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Wed Jun 13 14:57:54 2012
@@ -100,8 +100,9 @@ public class GarbageCollectorThread exte
         }
 
         @Override
-        public void process(long ledgerId, ByteBuffer entry) throws IOException {
-            scanner.process(ledgerId, entry);
+        public void process(long ledgerId, long offset, ByteBuffer entry)
+            throws IOException {
+            scanner.process(ledgerId, offset, entry);
         }
     }
 
@@ -442,7 +443,7 @@ public class GarbageCollectorThread exte
             return true;
         }
         @Override
-        public void process(long ledgerId, ByteBuffer entry) {
+        public void process(long ledgerId, long offset, ByteBuffer entry) {
             // add new entry size of a ledger to entry log meta
             meta.addLedgerSize(ledgerId, entry.limit() + 4);
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Wed Jun 13 14:57:54 2012
@@ -178,10 +178,10 @@ class InterleavedLedgerStorage implement
         }
 
         @Override
-        public void process(long ledgerId, ByteBuffer buffer)
+        public void process(long ledgerId, long offset, ByteBuffer buffer)
             throws IOException {
             addEntry(buffer);
         }
     }
 
-}
\ No newline at end of file
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Read Only Entry Logger
+ */
+public class ReadOnlyEntryLogger extends EntryLogger {
+
+    public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
+        super(conf);
+    }
+
+    @Override
+    protected void initialize() throws IOException {
+        // do nothing for read only entry logger
+    }
+
+    @Override
+    void createNewLog() throws IOException {
+        throw new IOException("Can't create new entry log using a readonly entry logger.");
+    }
+
+    @Override
+    protected boolean removeEntryLog(long entryLogId) {
+        // can't remove entry log in readonly mode
+        return false;
+    }
+
+    @Override
+    synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+        throw new IOException("Can't add entry to a readonly entry logger.");
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyFileInfo.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a readonly file info.
+ */
+class ReadOnlyFileInfo extends FileInfo {
+
+    public ReadOnlyFileInfo(File lf, byte[] masterKey) throws IOException {
+        super(lf, masterKey);
+        mode = "r";
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1349896&r1=1349895&r2=1349896&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Jun 13 14:57:54 2012
@@ -65,6 +65,16 @@ public abstract class AbstractConfigurat
     }
 
     /**
+     * Load configuration from other configuration object
+     *
+     * @param otherConf
+     *          Other configuration object
+     */
+    public void loadConf(Configuration otherConf) {
+        addConfiguration(otherConf);
+    }
+
+    /**
      * Set Ledger Manager Type.
      *
      * @param lmType

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EntryFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.IOException;
+
+import org.apache.commons.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Formatter to format an entry
+ */
+public abstract class EntryFormatter {
+
+    static Logger LOG = LoggerFactory.getLogger(EntryFormatter.class);
+
+    protected Configuration conf;
+
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Format an entry into a readable format
+     *
+     * @param data
+     *          Data Payload
+     */
+    public abstract void formatEntry(byte[] data);
+
+    /**
+     * Format an entry from a string into a readable format
+     *
+     * @param input
+     *          Input Stream
+     */
+    public abstract void formatEntry(java.io.InputStream input);
+
+    public final static EntryFormatter STRING_FORMATTER = new StringEntryFormatter();
+
+    public static EntryFormatter newEntryFormatter(Configuration conf, String clsProperty) {
+        String cls = conf.getString(clsProperty, StringEntryFormatter.class.getName());
+        ClassLoader classLoader = EntryFormatter.class.getClassLoader();
+        EntryFormatter formatter;
+        try {
+            Class aCls = classLoader.loadClass(cls);
+            formatter = (EntryFormatter) aCls.newInstance();
+            formatter.setConf(conf);
+        } catch (Exception e) {
+            LOG.warn("No formatter class found : " + cls, e);
+            LOG.warn("Using Default String Formatter.");
+            formatter = STRING_FORMATTER;
+        }
+        return formatter;
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringEntryFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.IOException;
+
+import org.apache.commons.configuration.Configuration;
+
+import com.google.protobuf.ByteString;
+
+public class StringEntryFormatter extends EntryFormatter {
+    @Override
+    public void formatEntry(byte[] data) {
+        System.out.println(ByteString.copyFrom(data).toStringUtf8());
+    }
+
+    @Override
+    public void formatEntry(java.io.InputStream input) {
+        try {
+            byte[] data = new byte[input.available()];
+            input.read(data, 0, data.length);
+            formatEntry(data);
+        } catch (IOException ie) {
+            System.out.println("Warn: Unreadable entry : " + ie.getMessage());
+        }
+    }
+
+};

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Tool.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * A tool interface that supports handling of generic command-line options.
+ */
+public interface Tool {
+    /**
+     * Exectue the command with given arguments
+     *
+     * @param args command specific arguments
+     * @return exit code.
+     * @throws Exception
+     */
+    public int run(String[] args) throws Exception;
+
+    /**
+     * Passe a configuration object to the tool.
+     *
+     * @param conf configuration object
+     * @throws Exception
+     */
+    public void setConf(Configuration conf) throws Exception;
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java?rev=1349896&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java Wed Jun 13 14:57:54 2012
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hedwig.data;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Format a pub sub message into a readable format.
+ */
+public class MessageFormatter extends EntryFormatter {
+    static Logger logger = LoggerFactory.getLogger(MessageFormatter.class);
+
+    static final String MESSAGE_PAYLOAD_FORMATTER_CLASS = "message_payload_formatter_class";
+
+    EntryFormatter dataFormatter = EntryFormatter.STRING_FORMATTER;
+
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        dataFormatter = EntryFormatter.newEntryFormatter(conf, MESSAGE_PAYLOAD_FORMATTER_CLASS);
+    }
+
+    @Override
+    public void formatEntry(java.io.InputStream input) {
+        Message message;
+        try {
+            message = Message.parseFrom(input);
+        } catch (IOException e) {
+            System.out.println("WARN: Unreadable message found\n");
+            EntryFormatter.STRING_FORMATTER.formatEntry(input);
+            return;
+        }
+        formatMessage(message);
+    }
+
+    @Override
+    public void formatEntry(byte[] data) {
+        Message message;
+        try {
+            message = Message.parseFrom(data);
+        } catch (IOException e) {
+            System.out.println("WARN: Unreadable message found\n");
+            EntryFormatter.STRING_FORMATTER.formatEntry(data);
+            return;
+        }
+        formatMessage(message);
+    }
+
+    void formatMessage(Message message) {
+        // print msg id
+        String msgId;
+        if (!message.hasMsgId()) {
+            msgId = "N/A";
+        } else {
+            MessageSeqId seqId = message.getMsgId();
+            StringBuilder idBuilder = new StringBuilder();
+            if (seqId.hasLocalComponent()) {
+                idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
+            } else {
+                List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
+                int i = 0, numRegions = remoteIds.size();
+                idBuilder.append("REMOTE(");
+                for (RegionSpecificSeqId rssid : remoteIds) {
+                    idBuilder.append(rssid.getRegion().toStringUtf8());
+                    idBuilder.append("[");
+                    idBuilder.append(rssid.getSeqId());
+                    idBuilder.append("]");
+                    ++i;
+                    if (i < numRegions) {
+                        idBuilder.append(",");
+                    }
+                }
+                idBuilder.append(")");
+            }
+            msgId = idBuilder.toString();
+        }
+        System.out.println("****** MSGID=" + msgId + " ******");
+        System.out.println("MessageId:      " + msgId);
+        // print source region
+        if (message.hasSrcRegion()) {
+            System.out.println("SrcRegion:      " + message.getSrcRegion().toStringUtf8());
+        } else {
+            System.out.println("SrcRegion:      N/A");
+        }
+        // print message body
+        if (message.hasBody()) {
+            System.out.println("Body:");
+            dataFormatter.formatEntry(message.getBody().toByteArray());
+        } else {
+            System.out.println("Body:           N/A");
+        }
+        System.out.println();
+    }
+}