You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2011/12/14 00:27:14 UTC

svn commit: r1213983 - in /hadoop/common/trunk/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/contrib/bkjournal/ hadoop-hdfs/src/contrib/bkjournal/src/ hadoop-hdfs/src/contrib/bkjournal/src/main/ hadoop-hdfs/src/contrib/bkjournal/src/main/java/ h...

Author: jitendra
Date: Tue Dec 13 23:27:13 2011
New Revision: 1213983

URL: http://svn.apache.org/viewvc?rev=1213983&view=rev
Log:
HDFS-234. Integration with BookKeeper logging system. Contributed by Ivan Kelly.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/pom.xml

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1213983&r1=1213982&r2=1213983&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Dec 13 23:27:13 2011
@@ -34,6 +34,9 @@ Trunk (unreleased changes)
 
     HDFS-2666. Fix TestBackupNode failure. (suresh)
 
+    HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly 
+    via jitendra)
+
     HDFS-2663. Optional protobuf parameters are not handled correctly. (suresh)
 
   IMPROVEMENTS

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt Tue Dec 13 23:27:13 2011
@@ -0,0 +1,60 @@
+This module provides a BookKeeper backend for HFDS Namenode write
+ahead logging.  
+
+BookKeeper is a highly available distributed write ahead logging
+system. For more details, see
+   
+    http://zookeeper.apache.org/bookkeeper
+
+-------------------------------------------------------------------------------
+How do I build?
+
+ To generate the distribution packages for BK journal, do the
+ following.
+
+   $ mvn clean install -Pdist -Dtar
+
+ This will generate a tarball, 
+ target/hadoop-hdfs-bkjournal-<VERSION>.tar.gz 
+
+-------------------------------------------------------------------------------
+How do I use the BookKeeper Journal?
+
+ To run a HDFS namenode using BookKeeper as a backend, extract the
+ distribution package on top of hdfs
+
+   cd hadoop-hdfs-<VERSION>/
+   tar --strip-components 1 -zxvf path/to/hadoop-hdfs-bkjournal-<VERSION>.tar.gz
+
+ Then, in hdfs-site.xml, set the following properties.
+
+   <property>
+     <name>dfs.namenode.edits.dir</name>
+     <value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
+   </property>
+
+   <property>
+     <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
+     <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
+   </property>
+
+ In this example, the namenode is configured to use 2 write ahead
+ logging devices. One writes to BookKeeper and the other to a local
+ file system. At the moment is is not possible to only write to 
+ BookKeeper, as the resource checker explicitly checked for local
+ disks currently.
+
+ The given example, configures the namenode to look for the journal
+ metadata at the path /bkjournal on the a standalone zookeeper ensemble
+ at localhost:2181. To configure a multiple host zookeeper ensemble,
+ separate the hosts with semicolons. For example, if you have 3
+ zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
+ would specify this with 
+  
+   bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
+
+ The final part /bkjournal specifies the znode in zookeeper where
+ ledger metadata will be store. Administrators can set this to anything
+ they wish.
+
+

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml Tue Dec 13 23:27:13 2011
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project-dist</artifactId>
+    <version>0.24.0-SNAPSHOT</version>
+    <relativePath>../../../../hadoop-project-dist</relativePath>
+  </parent>
+
+  <groupId>org.apache.hadoop.contrib</groupId>
+  <artifactId>hadoop-hdfs-bkjournal</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
+  <description>Apache Hadoop HDFS BookKeeper Journal</description>
+  <name>Apache Hadoop HDFS BookKeeper Journal</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <hadoop.component>hdfs</hadoop.component>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency> 
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>0.24.0-SNAPSHOT</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency> 
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>0.24.0-SNAPSHOT</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency> 
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>0.24.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>4.0.0</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+</project>

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Input stream which reads from a BookKeeper ledger.
+ */
+class BookKeeperEditLogInputStream extends EditLogInputStream {
+  static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
+
+  private final long firstTxId;
+  private final long lastTxId;
+  private final int logVersion;
+  private final LedgerHandle lh;
+
+  private final FSEditLogOp.Reader reader;
+  private final FSEditLogLoader.PositionTrackingInputStream tracker;
+
+  /**
+   * Construct BookKeeper edit log input stream.
+   * Starts reading from the first entry of the ledger.
+   */
+  BookKeeperEditLogInputStream(final LedgerHandle lh, 
+                               final EditLogLedgerMetadata metadata)
+      throws IOException {
+    this(lh, metadata, 0);
+  }
+
+  /**
+   * Construct BookKeeper edit log input stream. 
+   * Starts reading from firstBookKeeperEntry. This allows the stream
+   * to take a shortcut during recovery, as it doesn't have to read
+   * every edit log transaction to find out what the last one is.
+   */
+  BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
+                               long firstBookKeeperEntry) 
+      throws IOException {
+    this.lh = lh;
+    this.firstTxId = metadata.getFirstTxId();
+    this.lastTxId = metadata.getLastTxId();
+    this.logVersion = metadata.getVersion();
+
+    BufferedInputStream bin = new BufferedInputStream(
+        new LedgerInputStream(lh, firstBookKeeperEntry));
+    tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+    DataInputStream in = new DataInputStream(tracker);
+
+    reader = new FSEditLogOp.Reader(in, logVersion);
+  }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return firstTxId;
+  }
+
+  @Override
+  public long getLastTxId() throws IOException {
+    return lastTxId;
+  }
+  
+  @Override
+  public int getVersion() throws IOException {
+    return logVersion;
+  }
+
+  @Override
+  public FSEditLogOp readOp() throws IOException {
+    return reader.readOp();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      lh.close();
+    } catch (Exception e) {
+      throw new IOException("Exception closing ledger", e);
+    }
+  }
+
+  @Override
+  public long getPosition() {
+    return tracker.getPos();
+  }
+
+  @Override
+  public long length() throws IOException {
+    return lh.getLength();
+  }
+  
+  @Override
+  public String getName() {
+    return String.format("BookKeeper[%s,first=%d,last=%d]", 
+        lh.toString(), firstTxId, lastTxId);
+  }
+
+  @Override
+  public JournalType getType() {
+    assert (false);
+    return null;
+  }
+
+  /**
+   * Input stream implementation which can be used by 
+   * FSEditLogOp.Reader
+   */
+  private static class LedgerInputStream extends InputStream {
+    private long readEntries;
+    private InputStream entryStream = null;
+    private final LedgerHandle lh;
+    private final long maxEntry;
+
+    /**
+     * Construct ledger input stream
+     * @param lh the ledger handle to read from
+     * @param firstBookKeeperEntry ledger entry to start reading from
+     */
+    LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry) 
+        throws IOException {
+      this.lh = lh;
+      readEntries = firstBookKeeperEntry;
+      try {
+        maxEntry = lh.getLastAddConfirmed();
+      } catch (Exception e) {
+        throw new IOException("Error reading last entry id", e);
+      }
+    }
+
+    /**
+     * Get input stream representing next entry in the
+     * ledger.
+     * @return input stream, or null if no more entries
+     */
+    private InputStream nextStream() throws IOException {
+      try {        
+        if (readEntries > maxEntry) {
+          return null;
+        }
+        Enumeration<LedgerEntry> entries 
+          = lh.readEntries(readEntries, readEntries);
+        readEntries++;
+        if (entries.hasMoreElements()) {
+            LedgerEntry e = entries.nextElement();
+            assert !entries.hasMoreElements();
+            return e.getEntryInputStream();
+        }
+      } catch (Exception e) {
+        throw new IOException("Error reading entries from bookkeeper", e);
+      }
+      return null;
+    }
+
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      if (read(b, 0, 1) != 1) {
+        return -1;
+      } else {
+        return b[0];
+      }
+    }
+    
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      try {
+        int read = 0;
+        if (entryStream == null) {
+          entryStream = nextStream();
+          if (entryStream == null) {
+            return read;
+          }
+        }
+
+        while (read < len) {
+          int thisread = entryStream.read(b, off+read, (len-read));
+          if (thisread == -1) {
+            entryStream = nextStream();
+            if (entryStream == null) {
+              return read;
+            }
+          } else {
+            read += thisread;
+          }
+        }
+        return read;
+      } catch (IOException e) {
+        throw e;
+      }
+
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+
+import java.util.Arrays;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.io.DataOutputBuffer;
+import java.io.IOException;
+
+/**
+ * Output stream for BookKeeper Journal.
+ * Multiple complete edit log entries are packed into a single bookkeeper
+ * entry before sending it over the network. The fact that the edit log entries
+ * are complete in the bookkeeper entries means that each bookkeeper log entry
+ *can be read as a complete edit log. This is useful for recover, as we don't
+ * need to read through the entire edit log segment to get the last written
+ * entry.
+ */
+class BookKeeperEditLogOutputStream
+  extends EditLogOutputStream implements AddCallback {
+  private final DataOutputBuffer bufCurrent;
+  private final AtomicInteger outstandingRequests;
+  private final int transmissionThreshold;
+  private final LedgerHandle lh;
+  private CountDownLatch syncLatch;
+  private final WriteLock wl;
+  private final Writer writer;
+
+  /**
+   * Construct an edit log output stream which writes to a ledger.
+
+   */
+  protected BookKeeperEditLogOutputStream(Configuration conf,
+                                          LedgerHandle lh, WriteLock wl)
+      throws IOException {
+    super();
+
+    bufCurrent = new DataOutputBuffer();
+    outstandingRequests = new AtomicInteger(0);
+    syncLatch = null;
+    this.lh = lh;
+    this.wl = wl;
+    this.wl.acquire();
+    this.writer = new Writer(bufCurrent);
+    this.transmissionThreshold
+      = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
+                    BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
+  }
+
+  @Override
+  public void create() throws IOException {
+    // noop
+  }
+
+  @Override
+  public void close() throws IOException {
+    setReadyToFlush();
+    flushAndSync();
+    try {
+      lh.close();
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted waiting on close", ie);
+    } catch (BKException bke) {
+      throw new IOException("BookKeeper error during close", bke);
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    try {
+      lh.close();
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted waiting on close", ie);
+    } catch (BKException bke) {
+      throw new IOException("BookKeeper error during abort", bke);
+    }
+
+    wl.release();
+  }
+
+  @Override
+  public void writeRaw(final byte[] data, int off, int len) throws IOException {
+    throw new IOException("Not supported for BK");
+  }
+
+  @Override
+  public void write(FSEditLogOp op) throws IOException {
+    wl.checkWriteLock();
+
+    writer.writeOp(op);
+
+    if (bufCurrent.getLength() > transmissionThreshold) {
+      transmit();
+    }
+  }
+
+  @Override
+  public void setReadyToFlush() throws IOException {
+    wl.checkWriteLock();
+
+    transmit();
+
+    synchronized(this) {
+      syncLatch = new CountDownLatch(outstandingRequests.get());
+    }
+  }
+
+  @Override
+  public void flushAndSync() throws IOException {
+    wl.checkWriteLock();
+
+    assert(syncLatch != null);
+    try {
+      syncLatch.await();
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted waiting on latch", ie);
+    }
+
+    syncLatch = null;
+    // wait for whatever we wait on
+  }
+
+  /**
+   * Transmit the current buffer to bookkeeper.
+   * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
+   * are never called at the same time.
+   */
+  private void transmit() throws IOException {
+    wl.checkWriteLock();
+
+    if (bufCurrent.getLength() > 0) {
+      byte[] entry = Arrays.copyOf(bufCurrent.getData(),
+                                   bufCurrent.getLength());
+      lh.asyncAddEntry(entry, this, null);
+      bufCurrent.reset();
+      outstandingRequests.incrementAndGet();
+    }
+  }
+
+  @Override
+  public void addComplete(int rc, LedgerHandle handle,
+                          long entryId, Object ctx) {
+    synchronized(this) {
+      outstandingRequests.decrementAndGet();
+      CountDownLatch l = syncLatch;
+      if (l != null) {
+        l.countDown();
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * BookKeeper Journal Manager
+ *
+ * To use, add the following to hdfs-site.xml.
+ * <pre>
+ * {@code
+ * <property>
+ *   <name>dfs.namenode.edits.dir</name>
+ *   <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
+ * </property>
+ *
+ * <property>
+ *   <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
+ *   <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
+ * </property>
+ * }
+ * </pre>
+ * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
+ * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
+ * pairs. In the example above there are 3 servers, in the ensemble,
+ * zk1, zk2 &amp; zk3, each one listening on port 2181.
+ *
+ * [root znode] is the path of the zookeeper znode, under which the editlog
+ * information will be stored.
+ *
+ * Other configuration options are:
+ * <ul>
+ *   <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
+ *       Number of bytes a bookkeeper journal stream will buffer before
+ *       forcing a flush. Default is 1024.</li>
+ *   <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
+ *       Number of bookkeeper servers in edit log ledger ensembles. This
+ *       is the number of bookkeeper servers which need to be available
+ *       for the ledger to be writable. Default is 3.</li>
+ *   <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
+ *       Number of bookkeeper servers in the write quorum. This is the
+ *       number of bookkeeper servers which must have acknowledged the
+ *       write of an entry before it is considered written.
+ *       Default is 2.</li>
+ *   <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
+ *       Password to use when creating ledgers. </li>
+ * </ul>
+ */
+public class BookKeeperJournalManager implements JournalManager {
+  static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
+
+  public static final String BKJM_OUTPUT_BUFFER_SIZE
+    = "dfs.namenode.bookkeeperjournal.output-buffer-size";
+  public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
+
+  public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
+    = "dfs.namenode.bookkeeperjournal.ensemble-size";
+  public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
+
+ public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
+    = "dfs.namenode.bookkeeperjournal.quorum-size";
+  public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
+
+  public static final String BKJM_BOOKKEEPER_DIGEST_PW
+    = "dfs.namenode.bookkeeperjournal.digestPw";
+  public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
+
+  private static final int BKJM_LAYOUT_VERSION = -1;
+
+  private final ZooKeeper zkc;
+  private final Configuration conf;
+  private final BookKeeper bkc;
+  private final WriteLock wl;
+  private final String ledgerPath;
+  private final MaxTxId maxTxId;
+  private final int ensembleSize;
+  private final int quorumSize;
+  private final String digestpw;
+  private final CountDownLatch zkConnectLatch;
+
+  private LedgerHandle currentLedger = null;
+
+  private int bytesToInt(byte[] b) {
+    assert b.length >= 4;
+    return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+  }
+
+  private byte[] intToBytes(int i) {
+    return new byte[] {
+      (byte)(i >> 24),
+      (byte)(i >> 16),
+      (byte)(i >> 8),
+      (byte)(i) };
+  }
+
+  /**
+   * Construct a Bookkeeper journal manager.
+   */
+  public BookKeeperJournalManager(Configuration conf, URI uri)
+      throws IOException {
+    this.conf = conf;
+    String zkConnect = uri.getAuthority().replace(";", ",");
+    String zkPath = uri.getPath();
+    ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                               BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
+    quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
+                             BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
+
+    ledgerPath = zkPath + "/ledgers";
+    String maxTxIdPath = zkPath + "/maxtxid";
+    String lockPath = zkPath + "/lock";
+    String versionPath = zkPath + "/version";
+    digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
+                        BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
+
+    try {
+      zkConnectLatch = new CountDownLatch(1);
+      zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher());
+      if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
+        throw new IOException("Error connecting to zookeeper");
+      }
+      if (zkc.exists(zkPath, false) == null) {
+        zkc.create(zkPath, new byte[] {'0'},
+            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+
+      Stat versionStat = zkc.exists(versionPath, false);
+      if (versionStat != null) {
+        byte[] d = zkc.getData(versionPath, false, versionStat);
+        // There's only one version at the moment
+        assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
+      } else {
+        zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
+                   Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+
+      if (zkc.exists(ledgerPath, false) == null) {
+        zkc.create(ledgerPath, new byte[] {'0'},
+            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+
+      bkc = new BookKeeper(new ClientConfiguration(),
+                           zkc);
+    } catch (Exception e) {
+      throw new IOException("Error initializing zk", e);
+    }
+
+    wl = new WriteLock(zkc, lockPath);
+    maxTxId = new MaxTxId(zkc, maxTxIdPath);
+  }
+
+  /**
+   * Start a new log segment in a BookKeeper ledger.
+   * First ensure that we have the write lock for this journal.
+   * Then create a ledger and stream based on that ledger.
+   * The ledger id is written to the inprogress znode, so that in the
+   * case of a crash, a recovery process can find the ledger we were writing
+   * to when we crashed.
+   * @param txId First transaction id to be written to the stream
+   */
+  @Override
+  public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    wl.acquire();
+
+    if (txId <= maxTxId.get()) {
+      throw new IOException("We've already seen " + txId
+          + ". A new stream cannot be created with it");
+    }
+    if (currentLedger != null) {
+      throw new IOException("Already writing to a ledger, id="
+                            + currentLedger.getId());
+    }
+    try {
+      currentLedger = bkc.createLedger(ensembleSize, quorumSize,
+                                       BookKeeper.DigestType.MAC,
+                                       digestpw.getBytes());
+      String znodePath = inprogressZNode();
+      EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
+          HdfsConstants.LAYOUT_VERSION,  currentLedger.getId(), txId);
+      /* Write the ledger metadata out to the inprogress ledger znode
+       * This can fail if for some reason our write lock has
+       * expired (@see WriteLock) and another process has managed to
+       * create the inprogress znode.
+       * In this case, throw an exception. We don't want to continue
+       * as this would lead to a split brain situation.
+       */
+      l.write(zkc, znodePath);
+
+      return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
+    } catch (Exception e) {
+      if (currentLedger != null) {
+        try {
+          currentLedger.close();
+        } catch (Exception e2) {
+          //log & ignore, an IOException will be thrown soon
+          LOG.error("Error closing ledger", e2);
+        }
+      }
+      throw new IOException("Error creating ledger", e);
+    }
+  }
+
+  /**
+   * Finalize a log segment. If the journal manager is currently
+   * writing to a ledger, ensure that this is the ledger of the log segment
+   * being finalized.
+   *
+   * Otherwise this is the recovery case. In the recovery case, ensure that
+   * the firstTxId of the ledger matches firstTxId for the segment we are
+   * trying to finalize.
+   */
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+    String inprogressPath = inprogressZNode();
+    try {
+      Stat inprogressStat = zkc.exists(inprogressPath, false);
+      if (inprogressStat == null) {
+        throw new IOException("Inprogress znode " + inprogressPath
+                              + " doesn't exist");
+      }
+
+      wl.checkWriteLock();
+      EditLogLedgerMetadata l
+        =  EditLogLedgerMetadata.read(zkc, inprogressPath);
+
+      if (currentLedger != null) { // normal, non-recovery case
+        if (l.getLedgerId() == currentLedger.getId()) {
+          try {
+            currentLedger.close();
+          } catch (BKException bke) {
+            LOG.error("Error closing current ledger", bke);
+          }
+          currentLedger = null;
+        } else {
+          throw new IOException(
+              "Active ledger has different ID to inprogress. "
+              + l.getLedgerId() + " found, "
+              + currentLedger.getId() + " expected");
+        }
+      }
+
+      if (l.getFirstTxId() != firstTxId) {
+        throw new IOException("Transaction id not as expected, "
+            + l.getFirstTxId() + " found, " + firstTxId + " expected");
+      }
+
+      l.finalizeLedger(lastTxId);
+      String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
+      try {
+        l.write(zkc, finalisedPath);
+      } catch (KeeperException.NodeExistsException nee) {
+        if (!l.verify(zkc, finalisedPath)) {
+          throw new IOException("Node " + finalisedPath + " already exists"
+                                + " but data doesn't match");
+        }
+      }
+      maxTxId.store(lastTxId);
+      zkc.delete(inprogressPath, inprogressStat.getVersion());
+    } catch (KeeperException e) {
+      throw new IOException("Error finalising ledger", e);
+    } catch (InterruptedException ie) {
+      throw new IOException("Error finalising ledger", ie);
+    } finally {
+      wl.release();
+    }
+  }
+
+  @Override
+  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+    for (EditLogLedgerMetadata l : getLedgerList()) {
+      if (l.getFirstTxId() == fromTxnId) {
+        try {
+          LedgerHandle h = bkc.openLedger(l.getLedgerId(),
+                                          BookKeeper.DigestType.MAC,
+                                          digestpw.getBytes());
+          return new BookKeeperEditLogInputStream(h, l);
+        } catch (Exception e) {
+          throw new IOException("Could not open ledger for " + fromTxnId, e);
+        }
+      }
+    }
+    throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
+  }
+
+  @Override
+  public long getNumberOfTransactions(long fromTxnId) throws IOException {
+    long count = 0;
+    long expectedStart = 0;
+    for (EditLogLedgerMetadata l : getLedgerList()) {
+      if (l.isInProgress()) {
+        long endTxId = recoverLastTxId(l);
+        if (endTxId == HdfsConstants.INVALID_TXID) {
+          break;
+        }
+        count += (endTxId - l.getFirstTxId()) + 1;
+        break;
+      }
+
+      if (l.getFirstTxId() < fromTxnId) {
+        continue;
+      } else if (l.getFirstTxId() == fromTxnId) {
+        count = (l.getLastTxId() - l.getFirstTxId()) + 1;
+        expectedStart = l.getLastTxId() + 1;
+      } else {
+        if (expectedStart != l.getFirstTxId()) {
+          if (count == 0) {
+            throw new CorruptionException("StartTxId " + l.getFirstTxId()
+                + " is not as expected " + expectedStart
+                + ". Gap in transaction log?");
+          } else {
+            break;
+          }
+        }
+        count += (l.getLastTxId() - l.getFirstTxId()) + 1;
+        expectedStart = l.getLastTxId() + 1;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+    wl.acquire();
+
+    synchronized (this) {
+      try {
+        EditLogLedgerMetadata l
+          = EditLogLedgerMetadata.read(zkc, inprogressZNode());
+        long endTxId = recoverLastTxId(l);
+        if (endTxId == HdfsConstants.INVALID_TXID) {
+          LOG.error("Unrecoverable corruption has occurred in segment "
+                    + l.toString() + " at path " + inprogressZNode()
+                    + ". Unable to continue recovery.");
+          throw new IOException("Unrecoverable corruption, please check logs.");
+        }
+        finalizeLogSegment(l.getFirstTxId(), endTxId);
+      } catch (KeeperException.NoNodeException nne) {
+          // nothing to recover, ignore
+      } finally {
+        if (wl.haveLock()) {
+          wl.release();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void purgeLogsOlderThan(long minTxIdToKeep)
+      throws IOException {
+    for (EditLogLedgerMetadata l : getLedgerList()) {
+      if (!l.isInProgress()
+          && l.getLastTxId() < minTxIdToKeep) {
+        try {
+          Stat stat = zkc.exists(l.getZkPath(), false);
+          zkc.delete(l.getZkPath(), stat.getVersion());
+          bkc.deleteLedger(l.getLedgerId());
+        } catch (InterruptedException ie) {
+          LOG.error("Interrupted while purging " + l, ie);
+        } catch (BKException bke) {
+          LOG.error("Couldn't delete ledger from bookkeeper", bke);
+        } catch (KeeperException ke) {
+          LOG.error("Error deleting ledger entry in zookeeper", ke);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      bkc.close();
+      zkc.close();
+    } catch (Exception e) {
+      throw new IOException("Couldn't close zookeeper client", e);
+    }
+  }
+
+  /**
+   * Set the amount of memory that this stream should use to buffer edits.
+   * Setting this will only affect future output stream. Streams
+   * which have currently be created won't be affected.
+   */
+  @Override
+  public void setOutputBufferCapacity(int size) {
+    conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
+  }
+
+  /**
+   * Find the id of the last edit log transaction writen to a edit log
+   * ledger.
+   */
+  private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
+    try {
+      LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
+                                       BookKeeper.DigestType.MAC,
+                                       digestpw.getBytes());
+      long lastAddConfirmed = lh.getLastAddConfirmed();
+      BookKeeperEditLogInputStream in
+        = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
+
+      long endTxId = HdfsConstants.INVALID_TXID;
+      FSEditLogOp op = in.readOp();
+      while (op != null) {
+        if (endTxId == HdfsConstants.INVALID_TXID
+            || op.getTransactionId() == endTxId+1) {
+          endTxId = op.getTransactionId();
+        }
+        op = in.readOp();
+      }
+      return endTxId;
+    } catch (Exception e) {
+      throw new IOException("Exception retreiving last tx id for ledger " + l,
+                            e);
+    }
+  }
+
+  /**
+   * Get a list of all segments in the journal.
+   */
+  private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
+    List<EditLogLedgerMetadata> ledgers
+      = new ArrayList<EditLogLedgerMetadata>();
+    try {
+      List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
+      for (String n : ledgerNames) {
+        ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception reading ledger list from zk", e);
+    }
+
+    Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
+    return ledgers;
+  }
+
+  /**
+   * Get the znode path for a finalize ledger
+   */
+  String finalizedLedgerZNode(long startTxId, long endTxId) {
+    return String.format("%s/edits_%018d_%018d",
+                         ledgerPath, startTxId, endTxId);
+  }
+
+  /**
+   * Get the znode path for the inprogressZNode
+   */
+  String inprogressZNode() {
+    return ledgerPath + "/inprogress";
+  }
+
+  /**
+   * Simple watcher to notify when zookeeper has connected
+   */
+  private class ZkConnectionWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      if (Event.KeeperState.SyncConnected.equals(event.getState())) {
+        zkConnectLatch.countDown();
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import java.util.Comparator;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class for storing the metadata associated 
+ * with a single edit log segment, stored in a single ledger
+ */
+public class EditLogLedgerMetadata {
+  static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
+
+  private String zkPath;
+  private final long ledgerId;
+  private final int version;
+  private final long firstTxId;
+  private long lastTxId;
+  private boolean inprogress;
+  
+  public static final Comparator COMPARATOR 
+    = new Comparator<EditLogLedgerMetadata>() {
+    public int compare(EditLogLedgerMetadata o1,
+        EditLogLedgerMetadata o2) {
+      if (o1.firstTxId < o2.firstTxId) {
+        return -1;
+      } else if (o1.firstTxId == o2.firstTxId) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+  };
+
+  EditLogLedgerMetadata(String zkPath, int version, 
+                        long ledgerId, long firstTxId) {
+    this.zkPath = zkPath;
+    this.ledgerId = ledgerId;
+    this.version = version;
+    this.firstTxId = firstTxId;
+    this.lastTxId = HdfsConstants.INVALID_TXID;
+    this.inprogress = true;
+  }
+  
+  EditLogLedgerMetadata(String zkPath, int version, long ledgerId, 
+                        long firstTxId, long lastTxId) {
+    this.zkPath = zkPath;
+    this.ledgerId = ledgerId;
+    this.version = version;
+    this.firstTxId = firstTxId;
+    this.lastTxId = lastTxId;
+    this.inprogress = false;
+  }
+
+  String getZkPath() {
+    return zkPath;
+  }
+
+  long getFirstTxId() {
+    return firstTxId;
+  }
+  
+  long getLastTxId() {
+    return lastTxId;
+  }
+  
+  long getLedgerId() {
+    return ledgerId;
+  }
+  
+  int getVersion() {
+    return version;
+  }
+
+  boolean isInProgress() {
+    return this.inprogress;
+  }
+
+  void finalizeLedger(long newLastTxId) {
+    assert this.lastTxId == HdfsConstants.INVALID_TXID;
+    this.lastTxId = newLastTxId;
+    this.inprogress = false;      
+  }
+  
+  static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
+      throws IOException, KeeperException.NoNodeException  {
+    try {
+      byte[] data = zkc.getData(path, false, null);
+      String[] parts = new String(data).split(";");
+      if (parts.length == 3) {
+        int version = Integer.valueOf(parts[0]);
+        long ledgerId = Long.valueOf(parts[1]);
+        long txId = Long.valueOf(parts[2]);
+        return new EditLogLedgerMetadata(path, version, ledgerId, txId);
+      } else if (parts.length == 4) {
+        int version = Integer.valueOf(parts[0]);
+        long ledgerId = Long.valueOf(parts[1]);
+        long firstTxId = Long.valueOf(parts[2]);
+        long lastTxId = Long.valueOf(parts[3]);
+        return new EditLogLedgerMetadata(path, version, ledgerId,
+                                         firstTxId, lastTxId);
+      } else {
+        throw new IOException("Invalid ledger entry, "
+                              + new String(data));
+      }
+    } catch(KeeperException.NoNodeException nne) {
+      throw nne;
+    } catch(Exception e) {
+      throw new IOException("Error reading from zookeeper", e);
+    }
+  }
+    
+  void write(ZooKeeper zkc, String path)
+      throws IOException, KeeperException.NodeExistsException {
+    this.zkPath = path;
+    String finalisedData;
+    if (inprogress) {
+      finalisedData = String.format("%d;%d;%d",
+          version, ledgerId, firstTxId);
+    } else {
+      finalisedData = String.format("%d;%d;%d;%d",
+          version, ledgerId, firstTxId, lastTxId);
+    }
+    try {
+      zkc.create(path, finalisedData.getBytes(), 
+                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    } catch (KeeperException.NodeExistsException nee) {
+      throw nee;
+    } catch (Exception e) {
+      throw new IOException("Error creating ledger znode");
+    } 
+  }
+  
+  boolean verify(ZooKeeper zkc, String path) {
+    try {
+      EditLogLedgerMetadata other = read(zkc, path);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Verifying " + this.toString() 
+                  + " against " + other);
+      }
+      return other == this;
+    } catch (Exception e) {
+      LOG.error("Couldn't verify data in " + path, e);
+      return false;
+    }
+  }
+  
+  public boolean equals(Object o) {
+    if (!(o instanceof EditLogLedgerMetadata)) {
+      return false;
+    }
+    EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
+    return ledgerId == ol.ledgerId
+      && firstTxId == ol.firstTxId
+      && lastTxId == ol.lastTxId
+      && version == ol.version;
+  }
+
+ public int hashCode() { 
+    int hash = 1;
+    hash = hash * 31 + (int)ledgerId;
+    hash = hash * 31 + (int)firstTxId;
+    hash = hash * 31 + (int)lastTxId;
+    hash = hash * 31 + (int)version;
+    return hash;
+  }
+    
+  public String toString() {
+    return "[LedgerId:"+ledgerId +
+      ", firstTxId:" + firstTxId +
+      ", lastTxId:" + lastTxId + 
+      ", version:" + version + "]";
+  }
+
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class for storing and reading
+ * the max seen txid in zookeeper
+ */
+class MaxTxId {
+  static final Log LOG = LogFactory.getLog(MaxTxId.class);
+  
+  private final ZooKeeper zkc;
+  private final String path;
+
+  private Stat currentStat;
+
+  MaxTxId(ZooKeeper zkc, String path) {
+    this.zkc = zkc;
+    this.path = path;
+  }
+
+  synchronized void store(long maxTxId) throws IOException {
+    long currentMax = get();
+    if (currentMax < maxTxId) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Setting maxTxId to " + maxTxId);
+      }
+      String txidStr = Long.toString(maxTxId);
+      try {
+        if (currentStat != null) {
+          currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), 
+                                    currentStat.getVersion());
+        } else {
+          zkc.create(path, txidStr.getBytes("UTF-8"), 
+                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+      } catch (Exception e) {
+        throw new IOException("Error writing max tx id", e);
+      }
+    }
+  }
+
+  synchronized long get() throws IOException {
+    try {
+      currentStat = zkc.exists(path, false);
+      if (currentStat == null) {
+        return 0;
+      } else {
+        byte[] bytes = zkc.getData(path, false, currentStat);
+        String txidString = new String(bytes, "UTF-8");
+        return Long.valueOf(txidString);
+      }
+    } catch (Exception e) {
+      throw new IOException("Error reading the max tx id from zk", e);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
+
+import java.net.InetAddress;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Distributed lock, using ZooKeeper.
+ *
+ * The lock is vulnerable to timing issues. For example, the process could
+ * encounter a really long GC cycle between acquiring the lock, and writing to
+ * a ledger. This could have timed out the lock, and another process could have
+ * acquired the lock and started writing to bookkeeper. Therefore other
+ * mechanisms are required to ensure correctness (i.e. Fencing).
+ */
+class WriteLock implements Watcher {
+  static final Log LOG = LogFactory.getLog(WriteLock.class);
+
+  private final ZooKeeper zkc;
+  private final String lockpath;
+
+  private AtomicInteger lockCount = new AtomicInteger(0);
+  private String myznode = null;
+
+  WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
+    this.lockpath = lockpath;
+
+    this.zkc = zkc;
+    try {
+      if (zkc.exists(lockpath, false) == null) {
+        String localString = InetAddress.getLocalHost().toString();
+        zkc.create(lockpath, localString.getBytes(),
+                   Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception accessing Zookeeper", e);
+    }
+  }
+
+  void acquire() throws IOException {
+    while (true) {
+      if (lockCount.get() == 0) {
+        try {
+          synchronized(this) {
+            if (lockCount.get() > 0) {
+              lockCount.incrementAndGet();
+              return;
+            }
+            myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
+                                 Ids.OPEN_ACL_UNSAFE,
+                                 CreateMode.EPHEMERAL_SEQUENTIAL);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Acquiring lock, trying " + myznode);
+            }
+
+            List<String> nodes = zkc.getChildren(lockpath, false);
+            Collections.sort(nodes, new Comparator<String>() {
+                public int compare(String o1,
+                                   String o2) {
+                  Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
+                  Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
+                  return l1 - l2;
+                }
+              });
+            if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Lock acquired - " + myznode);
+              }
+              lockCount.set(1);
+              zkc.exists(myznode, this);
+              return;
+            } else {
+              LOG.error("Failed to acquire lock with " + myznode
+                        + ", " + nodes.get(0) + " already has it");
+              throw new IOException("Could not acquire lock");
+            }
+          }
+        } catch (KeeperException e) {
+          throw new IOException("Exception accessing Zookeeper", e);
+        } catch (InterruptedException ie) {
+          throw new IOException("Exception accessing Zookeeper", ie);
+        }
+      } else {
+        int ret = lockCount.getAndIncrement();
+        if (ret == 0) {
+          lockCount.decrementAndGet();
+          continue; // try again;
+        } else {
+          return;
+        }
+      }
+    }
+  }
+
+  void release() throws IOException {
+    try {
+      if (lockCount.decrementAndGet() <= 0) {
+        if (lockCount.get() < 0) {
+          LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+                   + lockCount.get());
+        }
+        synchronized(this) {
+          if (lockCount.get() <= 0) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("releasing lock " + myznode);
+            }
+            if (myznode != null) {
+              zkc.delete(myznode, -1);
+              myznode = null;
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception accessing Zookeeper", e);
+    }
+  }
+
+  public void checkWriteLock() throws IOException {
+    if (!haveLock()) {
+      throw new IOException("Lost writer lock");
+    }
+  }
+
+  boolean haveLock() throws IOException {
+    return lockCount.get() > 0;
+  }
+
+  public void process(WatchedEvent event) {
+    if (event.getState() == KeeperState.Disconnected
+        || event.getState() == KeeperState.Expired) {
+      LOG.warn("Lost zookeeper session, lost lock ");
+      lockCount.set(0);
+    } else {
+      // reapply the watch
+      synchronized (this) {
+        LOG.info("Zookeeper event " + event
+                 + " received, reapplying watch to " + myznode);
+        if (myznode != null) {
+          try {
+            zkc.exists(myznode, this);
+          } catch (Exception e) {
+            LOG.warn("Could not set watch on lock, releasing", e);
+            try {
+              release();
+            } catch (IOException ioe) {
+              LOG.error("Could not release Zk lock", ioe);
+            }
+          }
+        }
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.util.LocalBookKeeper;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.SecurityUtil;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestBookKeeperJournalManager {
+  static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
+  
+  private static final long DEFAULT_SEGMENT_SIZE = 1000;
+  private static final String zkEnsemble = "localhost:2181";
+
+  private static Thread bkthread;
+  protected static Configuration conf = new Configuration();
+  private ZooKeeper zkc;
+
+  private static ZooKeeper connectZooKeeper(String ensemble) 
+      throws IOException, KeeperException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+        
+    ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
+        public void process(WatchedEvent event) {
+          if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+            latch.countDown();
+          }
+        }
+      });
+    if (!latch.await(3, TimeUnit.SECONDS)) {
+      throw new IOException("Zookeeper took too long to connect");
+    }
+    return zkc;
+  }
+
+  @BeforeClass
+  public static void setupBookkeeper() throws Exception {
+    final int numBookies = 5;
+    bkthread = new Thread() {
+        public void run() {
+          try {
+            String[] args = new String[1];
+            args[0] = String.valueOf(numBookies);
+            LOG.info("Starting bk");
+            LocalBookKeeper.main(args);
+          } catch (InterruptedException e) {
+            // go away quietly
+          } catch (Exception e) {
+            LOG.error("Error starting local bk", e);
+          }
+        }
+      };
+    bkthread.start();
+    
+    if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
+      throw new Exception("Error starting zookeeper/bookkeeper");
+    }
+
+    ZooKeeper zkc = connectZooKeeper(zkEnsemble);
+    try {
+      boolean up = false;
+      for (int i = 0; i < 10; i++) {
+        try {
+          List<String> children = zkc.getChildren("/ledgers/available", 
+                                                  false);
+          if (children.size() == numBookies) {
+            up = true;
+            break;
+          }
+        } catch (KeeperException e) {
+          // ignore
+        }
+        Thread.sleep(1000);
+      }
+      if (!up) {
+        throw new IOException("Not enough bookies started");
+      }
+    } finally {
+      zkc.close();
+    }
+  }
+  
+  @Before
+  public void setup() throws Exception {
+    zkc = connectZooKeeper(zkEnsemble);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    zkc.close();
+  }
+
+  @AfterClass
+  public static void teardownBookkeeper() throws Exception {
+    if (bkthread != null) {
+      bkthread.interrupt();
+      bkthread.join();
+    }
+  }
+
+  @Test
+  public void testSimpleWrite() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
+    long txid = 1;
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1 ; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, 100);
+ 
+    String zkpath = bkjm.finalizedLedgerZNode(1, 100);
+    
+    assertNotNull(zkc.exists(zkpath, false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+  }
+
+  @Test
+  public void testNumberOfTransactions() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
+    long txid = 1;
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1 ; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, 100);
+
+    long numTrans = bkjm.getNumberOfTransactions(1);
+    assertEquals(100, numTrans);
+  }
+
+  @Test 
+  public void testNumberOfTransactionsWithGaps() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
+    long txid = 1;
+    for (long i = 0; i < 3; i++) {
+      long start = txid;
+      EditLogOutputStream out = bkjm.startLogSegment(start);
+      for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.close();
+      bkjm.finalizeLogSegment(start, txid-1);
+      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
+    }
+    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
+    
+    long numTrans = bkjm.getNumberOfTransactions(1);
+    assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
+    
+    try {
+      numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1);
+      fail("Should have thrown corruption exception by this point");
+    } catch (JournalManager.CorruptionException ce) {
+      // if we get here, everything is going good
+    }
+
+    numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1);
+    assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
+  }
+
+  @Test
+  public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
+    long txid = 1;
+    for (long i = 0; i < 3; i++) {
+      long start = txid;
+      EditLogOutputStream out = bkjm.startLogSegment(start);
+      for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      
+      out.close();
+      bkjm.finalizeLogSegment(start, (txid-1));
+      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
+    }
+    long start = txid;
+    EditLogOutputStream out = bkjm.startLogSegment(start);
+    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(txid++);
+      out.write(op);
+    }
+    out.setReadyToFlush();
+    out.flush();
+    out.abort();
+    out.close();
+    
+    long numTrans = bkjm.getNumberOfTransactions(1);
+    assertEquals((txid-1), numTrans);
+  }
+
+  /**
+   * Create a bkjm namespace, write a journal from txid 1, close stream.
+   * Try to create a new journal from txid 1. Should throw an exception.
+   */
+  @Test
+  public void testWriteRestartFrom1() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
+    long txid = 1;
+    long start = txid;
+    EditLogOutputStream out = bkjm.startLogSegment(txid);
+    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(txid++);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(start, (txid-1));
+    
+    txid = 1;
+    try {
+      out = bkjm.startLogSegment(txid);
+      fail("Shouldn't be able to start another journal from " + txid
+          + " when one already exists");
+    } catch (Exception ioe) {
+      LOG.info("Caught exception as expected", ioe);
+    }
+
+    // test border case
+    txid = DEFAULT_SEGMENT_SIZE;
+    try {
+      out = bkjm.startLogSegment(txid);
+      fail("Shouldn't be able to start another journal from " + txid
+          + " when one already exists");
+    } catch (IOException ioe) {
+      LOG.info("Caught exception as expected", ioe);
+    }
+
+    // open journal continuing from before
+    txid = DEFAULT_SEGMENT_SIZE + 1;
+    start = txid;
+    out = bkjm.startLogSegment(start);
+    assertNotNull(out);
+
+    for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(txid++);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(start, (txid-1));
+
+    // open journal arbitarily far in the future
+    txid = DEFAULT_SEGMENT_SIZE * 4;
+    out = bkjm.startLogSegment(txid);
+    assertNotNull(out);
+  }
+
+  @Test
+  public void testTwoWriters() throws Exception {
+    long start = 1;
+    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+    
+    EditLogOutputStream out1 = bkjm1.startLogSegment(start);
+    try {
+      EditLogOutputStream out2 = bkjm2.startLogSegment(start);
+      fail("Shouldn't have been able to open the second writer");
+    } catch (IOException ioe) {
+      LOG.info("Caught exception as expected", ioe);
+    }
+  }
+
+  @Test
+  public void testSimpleRead() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
+    long txid = 1;
+    final long numTransactions = 10000;
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1 ; i <= numTransactions; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, numTransactions);
+
+     
+    EditLogInputStream in = bkjm.getInputStream(1);
+    try {
+      assertEquals(numTransactions, 
+                   FSEditLogTestUtil.countTransactionsInStream(in));
+    } finally {
+      in.close();
+    }
+  }
+
+  @Test
+  public void testSimpleRecovery() throws Exception {
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
+        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    long txid = 1;
+    for (long i = 1 ; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.setReadyToFlush();
+    out.flush();
+
+    out.abort();
+    out.close();
+
+
+    assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
+    assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
+
+    bkjm.recoverUnfinalizedSegments();
+
+    assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java Tue Dec 13 23:27:13 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+/**
+ * Utilities for testing edit logs
+ */
+public class FSEditLogTestUtil {
+  public static FSEditLogOp getNoOpInstance() {
+    return FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+  }
+
+  public static long countTransactionsInStream(EditLogInputStream in) 
+      throws IOException {
+    FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
+    return validation.getNumTransactions();
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties?rev=1213983&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties Tue Dec 13 23:27:13 2011
@@ -0,0 +1,62 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+#
+# Bookkeeper Journal Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only
+log4j.rootLogger=OFF, CONSOLE
+
+# Example with rolling log file
+#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
+
+# Example with rolling log file and tracing
+#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=hdfs-namenode.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+
+

Modified: hadoop/common/trunk/hadoop-hdfs-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/pom.xml?rev=1213983&r1=1213982&r2=1213983&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/pom.xml Tue Dec 13 23:27:13 2011
@@ -30,6 +30,7 @@
   <modules>
     <module>hadoop-hdfs</module>
     <module>hadoop-hdfs-httpfs</module>
+    <module>hadoop-hdfs/src/contrib/bkjournal</module>
   </modules>
 
   <build>