You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC

svn commit: r1417596 [4/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Wed Dec  5 19:22:17 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -368,30 +369,36 @@ public class EditLogFileInputStream exte
 
     @Override
     public InputStream getInputStream() throws IOException {
-      HttpURLConnection connection = (HttpURLConnection)
-          SecurityUtil.openSecureHttpConnection(url);
-      
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new HttpGetFailedException(
-            "Fetch of " + url +
-            " failed with status code " + connection.getResponseCode() +
-            "\nResponse message:\n" + connection.getResponseMessage(),
-            connection);
-      }
-
-      String contentLength = connection.getHeaderField(CONTENT_LENGTH);
-      if (contentLength != null) {
-        advertisedSize = Long.parseLong(contentLength);
-        if (advertisedSize <= 0) {
-          throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
-              contentLength);
-        }
-      } else {
-        throw new IOException(CONTENT_LENGTH + " header is not provided " +
-                              "by the server when trying to fetch " + url);
-      }
-
-      return connection.getInputStream();
+      return SecurityUtil.doAsCurrentUser(
+          new PrivilegedExceptionAction<InputStream>() {
+            @Override
+            public InputStream run() throws IOException {
+              HttpURLConnection connection = (HttpURLConnection)
+                  SecurityUtil.openSecureHttpConnection(url);
+              
+              if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                throw new HttpGetFailedException(
+                    "Fetch of " + url +
+                    " failed with status code " + connection.getResponseCode() +
+                    "\nResponse message:\n" + connection.getResponseMessage(),
+                    connection);
+              }
+        
+              String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+              if (contentLength != null) {
+                advertisedSize = Long.parseLong(contentLength);
+                if (advertisedSize <= 0) {
+                  throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+                      contentLength);
+                }
+              } else {
+                throw new IOException(CONTENT_LENGTH + " header is not provided " +
+                                      "by the server when trying to fetch " + url);
+              }
+        
+              return connection.getInputStream();
+            }
+          });
     }
 
     @Override

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Dec  5 19:22:17 2012
@@ -176,7 +176,7 @@ public class EditLogFileOutputStream ext
    * accumulates new log records while readyBuffer will be flushed and synced.
    */
   @Override
-  public void flushAndSync() throws IOException {
+  public void flushAndSync(boolean durable) throws IOException {
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
@@ -186,7 +186,7 @@ public class EditLogFileOutputStream ext
     }
     preallocate(); // preallocate file if necessay
     doubleBuf.flushTo(fp);
-    if (!shouldSkipFsyncForTests) {
+    if (durable && !shouldSkipFsyncForTests) {
       fc.force(false); // metadata updates not needed
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Wed Dec  5 19:22:17 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Tim
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.jasper.compiler.JspUtil;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
@@ -92,18 +93,24 @@ public abstract class EditLogOutputStrea
   /**
    * Flush and sync all data that is ready to be flush 
    * {@link #setReadyToFlush()} into underlying persistent store.
+   * @param durable if true, the edits should be made truly durable before
+   * returning
    * @throws IOException
    */
-  abstract protected void flushAndSync() throws IOException;
+  abstract protected void flushAndSync(boolean durable) throws IOException;
 
   /**
    * Flush data to persistent store.
    * Collect sync metrics.
    */
   public void flush() throws IOException {
+    flush(true);
+  }
+  
+  public void flush(boolean durable) throws IOException {
     numSync++;
     long start = now();
-    flushAndSync();
+    flushAndSync(durable);
     long end = now();
     totalTimeSync += (end - start);
   }
@@ -132,4 +139,12 @@ public abstract class EditLogOutputStrea
   protected long getNumSync() {
     return numSync;
   }
+
+  /**
+   * @return a short HTML snippet suitable for describing the current
+   * status of the stream
+   */
+  public String generateHtmlReport() {
+    return JspUtil.escapeXml(this.toString());
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Wed Dec  5 19:22:17 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
  * to progress concurrently to flushes without allocating new buffers each
  * time.
  */
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
 
   private TxnBuffer bufCurrent; // current buffer for writing
   private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
     bufCurrent.writeOp(op);
   }
 
-  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
     bufCurrent.write(bytes, offset, length);
   }
   
-  void close() throws IOException {
+  public void close() throws IOException {
     Preconditions.checkNotNull(bufCurrent);
     Preconditions.checkNotNull(bufReady);
 
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
     bufCurrent = bufReady = null;
   }
   
-  void setReadyToFlush() {
+  public void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
     TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
    * Writes the content of the "ready" buffer to the given output stream,
    * and resets it. Does not swap any buffers.
    */
-  void flushTo(OutputStream out) throws IOException {
+  public void flushTo(OutputStream out) throws IOException {
     bufReady.writeTo(out); // write data to file
     bufReady.reset(); // erase all data in the buffer
   }
   
-  boolean shouldForceSync() {
+  public boolean shouldForceSync() {
     return bufCurrent.size() >= initBufferSize;
   }
 
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
     return bufReady.numTxns;
   }
 
+  /**
+   * @return the number of bytes that are ready to be flushed
+   */
+  public int countReadyBytes() {
+    return bufReady.size();
+  }
   
   private static class TxnBuffer extends DataOutputBuffer {
     long firstTxId;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Dec  5 19:22:17 2012
@@ -1107,6 +1107,7 @@ public class FSEditLog implements LogsPu
       journalSet.recoverUnfinalizedSegments();
     } catch (IOException ex) {
       // All journals have failed, it is handled in logSync.
+      // TODO: are we sure this is OK?
     }
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Dec  5 19:22:17 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.ComparisonChain;
@@ -51,7 +53,8 @@ import com.google.common.collect.Compari
  * Note: this class is not thread-safe and should be externally
  * synchronized.
  */
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
   private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
 
   private final StorageDirectory sd;
@@ -164,7 +167,7 @@ class FileJournalManager implements Jour
    * @return a list of remote edit logs
    * @throws IOException if edit logs cannot be listed.
    */
-  List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+  public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -182,6 +185,8 @@ class FileJournalManager implements Jour
       }
     }
     
+    Collections.sort(ret);
+    
     return ret;
   }
 
@@ -195,7 +200,7 @@ class FileJournalManager implements Jour
    * @throws IOException
    *           IOException thrown for invalid logDir
    */
-  static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+  public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
     return matchEditLogs(FileUtil.listFiles(logDir));
   }
   
@@ -223,7 +228,7 @@ class FileJournalManager implements Jour
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-              new EditLogFile(f, startTxId, startTxId, true));
+              new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
@@ -237,15 +242,8 @@ class FileJournalManager implements Jour
   @Override
   synchronized public void selectInputStreams(
       Collection<EditLogInputStream> streams, long fromTxId,
-      boolean inProgressOk) {
-    List<EditLogFile> elfs;
-    try {
-      elfs = matchEditLogs(sd.getCurrentDir());
-    } catch (IOException e) {
-      LOG.error("error listing files in " + this + ". " +
-          "Skipping all edit logs in this directory.", e);
-      return;
-    }
+      boolean inProgressOk) throws IOException {
+    List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
     LOG.debug(this + ": selecting input streams starting at " + fromTxId + 
         (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
         "from among " + elfs.size() + " candidate file(s)");
@@ -326,7 +324,7 @@ class FileJournalManager implements Jour
     }
   }
 
-  List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+  public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> logFiles = Lists.newArrayList();
@@ -342,6 +340,32 @@ class FileJournalManager implements Jour
 
     return logFiles;
   }
+  
+  public EditLogFile getLogFile(long startTxId) throws IOException {
+    return getLogFile(sd.getCurrentDir(), startTxId);
+  }
+  
+  public static EditLogFile getLogFile(File dir, long startTxId)
+      throws IOException {
+    List<EditLogFile> files = matchEditLogs(dir);
+    List<EditLogFile> ret = Lists.newLinkedList();
+    for (EditLogFile elf : files) {
+      if (elf.getFirstTxId() == startTxId) {
+        ret.add(elf);
+      }
+    }
+    
+    if (ret.isEmpty()) {
+      // no matches
+      return null;
+    } else if (ret.size() == 1) {
+      return ret.get(0);
+    } else {
+      throw new IllegalStateException("More than one log segment in " + 
+          dir + " starting at txid " + startTxId + ": " +
+          Joiner.on(", ").join(ret));
+    }
+  }
 
   @Override
   public String toString() {
@@ -351,7 +375,8 @@ class FileJournalManager implements Jour
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
-  static class EditLogFile {
+  @InterfaceAudience.Private
+  public static class EditLogFile {
     private File file;
     private final long firstTxId;
     private long lastTxId;
@@ -384,17 +409,20 @@ class FileJournalManager implements Jour
       assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
+      Preconditions.checkArgument(!isInProgress ||
+          lastTxId == HdfsConstants.INVALID_TXID);
+      
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
       this.isInProgress = isInProgress;
     }
     
-    long getFirstTxId() {
+    public long getFirstTxId() {
       return firstTxId;
     }
     
-    long getLastTxId() {
+    public long getLastTxId() {
       return lastTxId;
     }
     
@@ -407,17 +435,17 @@ class FileJournalManager implements Jour
      * This will update the lastTxId of the EditLogFile or
      * mark it as corrupt if it is.
      */
-    void validateLog() throws IOException {
+    public void validateLog() throws IOException {
       EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
       this.lastTxId = val.getEndTxId();
       this.hasCorruptHeader = val.hasCorruptHeader();
     }
 
-    boolean isInProgress() {
+    public boolean isInProgress() {
       return isInProgress;
     }
 
-    File getFile() {
+    public File getFile() {
       return file;
     }
     
@@ -430,7 +458,7 @@ class FileJournalManager implements Jour
       renameSelf(".corrupt");
     }
 
-    void moveAsideEmptyFile() throws IOException {
+    public void moveAsideEmptyFile() throws IOException {
       assert lastTxId == HdfsConstants.INVALID_TXID;
       renameSelf(".empty");
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Wed Dec  5 19:22:17 2012
@@ -26,7 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -146,7 +146,7 @@ public class JournalSet implements Journ
       return journal;
     }
 
-    private boolean isDisabled() {
+    boolean isDisabled() {
       return disabled;
     }
 
@@ -164,8 +164,12 @@ public class JournalSet implements Journ
       return required;
     }
   }
-  
-  private List<JournalAndStream> journals = Lists.newArrayList();
+ 
+  // COW implementation is necessary since some users (eg the web ui) call
+  // getAllJournalStreams() and then iterate. Since this is rarely
+  // mutated, there is no performance concern.
+  private List<JournalAndStream> journals =
+      new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
   final int minimumRedundantJournals;
   
   JournalSet(int minimumRedundantResources) {
@@ -241,8 +245,20 @@ public class JournalSet implements Journ
         LOG.info("Skipping jas " + jas + " since it's disabled");
         continue;
       }
-      jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+      try {
+        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to determine input streams from " + jas.getManager() +
+            ". Skipping.", ioe);
+      }
     }
+    chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+  }
+  
+  public static void chainAndMakeRedundantStreams(
+      Collection<EditLogInputStream> outStreams,
+      PriorityQueue<EditLogInputStream> allStreams,
+      long fromTxId, boolean inProgressOk) {
     // We want to group together all the streams that start on the same start
     // transaction ID.  To do this, we maintain an accumulator (acc) of all
     // the streams we've seen at a given start transaction ID.  When we see a
@@ -260,7 +276,7 @@ public class JournalSet implements Journ
         if (accFirstTxId == elis.getFirstTxId()) {
           acc.add(elis);
         } else if (accFirstTxId < elis.getFirstTxId()) {
-          streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+          outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
         } else if (accFirstTxId > elis.getFirstTxId()) {
@@ -271,7 +287,7 @@ public class JournalSet implements Journ
       }
     }
     if (!acc.isEmpty()) {
-      streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+      outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }
   }
@@ -453,12 +469,12 @@ public class JournalSet implements Journ
     }
 
     @Override
-    protected void flushAndSync() throws IOException {
+    protected void flushAndSync(final boolean durable) throws IOException {
       mapJournalsAndReportErrors(new JournalClosure() {
         @Override
         public void apply(JournalAndStream jas) throws IOException {
           if (jas.isActive()) {
-            jas.getCurrentStream().flushAndSync();
+            jas.getCurrentStream().flushAndSync(durable);
           }
         }
       }, "flushAndSync");
@@ -511,7 +527,6 @@ public class JournalSet implements Journ
     }
   }
   
-  @VisibleForTesting
   List<JournalAndStream> getAllJournalStreams() {
     return journals;
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Dec  5 19:22:17 2012
@@ -725,6 +725,12 @@ public class NameNode {
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
     checkAllowFormat(conf);
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      InetSocketAddress socAddr = getAddress(conf);
+      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+          DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+    }
     
     Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
@@ -766,13 +772,13 @@ public class NameNode {
   }
   
   @VisibleForTesting
-  public static boolean initializeSharedEdits(Configuration conf) {
+  public static boolean initializeSharedEdits(Configuration conf) throws IOException {
     return initializeSharedEdits(conf, true);
   }
   
   @VisibleForTesting
   public static boolean initializeSharedEdits(Configuration conf,
-      boolean force) {
+      boolean force) throws IOException {
     return initializeSharedEdits(conf, force, false);
   }
 
@@ -786,7 +792,7 @@ public class NameNode {
    * @return true if the command aborts, false otherwise
    */
   private static boolean initializeSharedEdits(Configuration conf,
-      boolean force, boolean interactive) {
+      boolean force, boolean interactive) throws IOException {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
@@ -797,6 +803,12 @@ public class NameNode {
       return false;
     }
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      InetSocketAddress socAddr = getAddress(conf);
+      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+          DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+    }
+
     NNStorage existingStorage = null;
     try {
       Configuration confWithoutShared = new Configuration(conf);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Wed Dec  5 19:22:17 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
   static boolean areResourcesAvailable(
       Collection<? extends CheckableNameNodeResource> resources,
       int minimumRedundantResources) {
+
+    // TODO: workaround:
+    // - during startup, if there are no edits dirs on disk, then there is
+    // a call to areResourcesAvailable() with no dirs at all, which was
+    // previously causing the NN to enter safemode
+    if (resources.isEmpty()) {
+      return true;
+    }
     
     int requiredResourceCount = 0;
     int redundantResourceCount = 0;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Dec  5 19:22:17 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
@@ -60,6 +61,8 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.znerd.xmlenc.XMLOutputter;
 
+import com.google.common.base.Preconditions;
+
 class NamenodeJspHelper {
   static String getSafeModeText(FSNamesystem fsn) {
     if (!fsn.isInSafeMode())
@@ -212,6 +215,52 @@ class NamenodeJspHelper {
 
       out.print("</table></div>\n");
     }
+    
+    /**
+     * Generate an HTML report containing the current status of the HDFS
+     * journals.
+     */
+    void generateJournalReport(JspWriter out, NameNode nn,
+        HttpServletRequest request) throws IOException {
+      FSEditLog log = nn.getFSImage().getEditLog();
+      Preconditions.checkArgument(log != null, "no edit log set in %s", nn);
+      
+      out.println("<h3> " + nn.getRole() + " Journal Status: </h3>");
+
+      out.println("<b>Current transaction ID:</b> " +
+          nn.getFSImage().getLastAppliedOrWrittenTxId() + "<br/>");
+      
+      
+      boolean openForWrite = log.isOpenForWrite();
+      
+      out.println("<div class=\"dfstable\">");
+      out.println("<table class=\"storage\" title=\"NameNode Journals\">\n"
+              + "<thead><tr><td><b>Journal Manager</b></td><td><b>State</b></td></tr></thead>");
+      for (JournalAndStream jas : log.getJournals()) {
+        out.print("<tr>");
+        out.print("<td>" + jas.getManager());
+        if (jas.isRequired()) {
+          out.print(" [required]");
+        }
+        out.print("</td><td>");
+        
+        if (jas.isDisabled()) {
+          out.print("<span class=\"failed\">Failed</span>");
+        } else if (openForWrite) {
+          EditLogOutputStream elos = jas.getCurrentStream();
+          if (elos != null) {
+            out.println(elos.generateHtmlReport());
+          } else {
+            out.println("not currently writing");
+          }
+        } else {
+          out.println("open for read");
+        }
+        out.println("</td></tr>");
+      }
+      
+      out.println("</table></div>");
+    }
 
     void generateHealthReport(JspWriter out, NameNode nn,
         HttpServletRequest request) throws IOException {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java Wed Dec  5 19:22:17 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
@@ -172,11 +173,20 @@ public class EditLogTailer {
     Preconditions.checkState(tailerThread == null ||
         !tailerThread.isAlive(),
         "Tailer thread should not be running once failover starts");
-    try {
-      doTailEdits();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
+    // Important to do tailing as the login user, in case the shared
+    // edits storage is implemented by a JournalManager that depends
+    // on security credentials to access the logs (eg QuorumJournalManager).
+    SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        try {
+          doTailEdits();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+        return null;
+      }
+    });
   }
   
   @VisibleForTesting

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Wed Dec  5 19:22:17 2012
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ComparisonChain;
 
-public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
+public class RemoteEditLog implements Comparable<RemoteEditLog> {
   private long startTxId = HdfsConstants.INVALID_TXID;
   private long endTxId = HdfsConstants.INVALID_TXID;
+  private boolean isInProgress = false;
   
   public RemoteEditLog() {
   }
@@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr
   public RemoteEditLog(long startTxId, long endTxId) {
     this.startTxId = startTxId;
     this.endTxId = endTxId;
+    this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID);
+  }
+  
+  public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) {
+    this.startTxId = startTxId;
+    this.endTxId = endTxId;
+    this.isInProgress = inProgress;
   }
 
   public long getStartTxId() {
@@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr
   public long getEndTxId() {
     return endTxId;
   }
-    
-  @Override
-  public String toString() {
-    return "[" + startTxId + "," + endTxId + "]";
-  }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(startTxId);
-    out.writeLong(endTxId);
+  public boolean isInProgress() {
+    return isInProgress;
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    startTxId = in.readLong();
-    endTxId = in.readLong();
+  public String toString() {
+    if (!isInProgress) {
+      return "[" + startTxId + "," + endTxId + "]";
+    } else {
+      return "[" + startTxId + "-? (in-progress)]";
+    }
   }
   
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java Wed Dec  5 19:22:17 2012
@@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
   
   
   /**
-   * Check that the logs are contiguous and non-overlapping
-   * sequences of transactions, in sorted order
+   * Check that the logs are non-overlapping sequences of transactions,
+   * in sorted order. They do not need to be contiguous.
    * @throws IllegalStateException if incorrect
    */
   private void checkState()  {
@@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
     RemoteEditLog prev = null;
     for (RemoteEditLog log : logs) {
       if (prev != null) {
-        if (log.getStartTxId() != prev.getEndTxId() + 1) {
-          throw new IllegalStateException("Invalid log manifest:" + this);
+        if (log.getStartTxId() <= prev.getEndTxId()) {
+          throw new IllegalStateException(
+              "Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+              + this);
         }
       }
       

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java Wed Dec  5 19:22:17 2012
@@ -56,7 +56,7 @@ public class BinaryEditsVisitor implemen
   @Override
   public void close(Throwable error) throws IOException {
     elfos.setReadyToFlush();
-    elfos.flushAndSync();
+    elfos.flushAndSync(true);
     elfos.close();
   }
 

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+
+/**
+ * Class that represents a file on disk which stores a single <code>long</code>
+ * value, but does not make any effort to make it truly durable. This is in
+ * contrast to {@link PersistentLongFile} which fsync()s the value on every
+ * change.
+ * 
+ * This should be used for values which are updated frequently (such that
+ * performance is important) and not required to be up-to-date for correctness.
+ * 
+ * This class also differs in that it stores the value as binary data instead
+ * of a textual string.
+ */
+@InterfaceAudience.Private
+public class BestEffortLongFile implements Closeable {
+
+  private final File file;
+  private final long defaultVal;
+
+  private long value;
+  
+  private FileChannel ch = null;
+  
+  private ByteBuffer buf = ByteBuffer.allocate(Long.SIZE/8);
+  
+  public BestEffortLongFile(File file, long defaultVal) {
+    this.file = file;
+    this.defaultVal = defaultVal;
+  }
+  
+  public long get() throws IOException {
+    lazyOpen();
+    return value;
+  }
+
+  public void set(long newVal) throws IOException {
+    lazyOpen();
+    buf.clear();
+    buf.putLong(newVal);
+    buf.flip();
+    IOUtils.writeFully(ch, buf, 0);
+    value = newVal;
+  }
+  
+  private void lazyOpen() throws IOException {
+    if (ch != null) {
+      return;
+    }
+
+    // Load current value.
+    byte[] data = null;
+    try {
+      data = Files.toByteArray(file);
+    } catch (FileNotFoundException fnfe) {
+      // Expected - this will use default value.
+    }
+
+    if (data != null && data.length != 0) {
+      if (data.length != Longs.BYTES) {
+        throw new IOException("File " + file + " had invalid length: " +
+            data.length);
+      }
+      value = Longs.fromByteArray(data);
+    } else {
+      value = defaultVal;
+    }
+    
+    // Now open file for future writes.
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    try {
+      ch = raf.getChannel();
+    } finally {
+      if (ch == null) {
+        IOUtils.closeStream(raf);
+      }
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (ch != null) {
+      ch.close();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java Wed Dec  5 19:22:17 2012
@@ -57,7 +57,9 @@ public class PersistentLongFile {
   }
   
   public void set(long newVal) throws IOException {
-    writeFile(file, newVal);
+    if (value != newVal || !loaded) {
+      writeFile(file, newVal);
+    }
     value = newVal;
     loaded = true;
   }

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Wed Dec  5 19:22:17 2012
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
+option java_outer_classname = "QJournalProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "hdfs.proto";
+
+message JournalIdProto {
+  required string identifier = 1;
+}
+
+message RequestInfoProto {
+  required JournalIdProto journalId = 1;
+  required uint64 epoch = 2;
+  required uint64 ipcSerialNumber = 3;
+
+  // Whenever a writer makes a request, it informs
+  // the node of the latest committed txid. This may
+  // be higher than the transaction data included in the
+  // request itself, eg in the case that the node has
+  // fallen behind.
+  optional uint64 committedTxId = 4;
+}
+
+message SegmentStateProto {
+  required uint64 startTxId = 1;
+  required uint64 endTxId = 2;
+  required bool isInProgress = 3;
+}
+
+/**
+ * The storage format used on local disk for previously
+ * accepted decisions.
+ */
+message PersistedRecoveryPaxosData {
+  required SegmentStateProto segmentState = 1;
+  required uint64 acceptedInEpoch = 2;
+}
+
+/**
+ * journal()
+ */
+
+message JournalRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 firstTxnId = 2;
+  required uint32 numTxns = 3;
+  required bytes records = 4;
+  required uint64 segmentTxnId = 5;
+}
+
+message JournalResponseProto { 
+}
+
+/**
+ * heartbeat()
+ */
+
+message HeartbeatRequestProto {
+  required RequestInfoProto reqInfo = 1;
+}
+
+message HeartbeatResponseProto { // void response
+}
+
+/**
+ * startLogSegment()
+ */
+message StartLogSegmentRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 txid = 2; // Transaction ID
+}
+
+message StartLogSegmentResponseProto { 
+}
+
+/**
+ * finalizeLogSegment()
+ */
+message FinalizeLogSegmentRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 startTxId = 2;
+  required uint64 endTxId = 3;
+}
+
+message FinalizeLogSegmentResponseProto { 
+}
+
+/**
+ * purgeLogs()
+ */
+message PurgeLogsRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 minTxIdToKeep = 2;
+}
+
+message PurgeLogsResponseProto {
+}
+
+/**
+ * isFormatted()
+ */
+message IsFormattedRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message IsFormattedResponseProto {
+  required bool isFormatted = 1;
+}
+
+/**
+ * getJournalState()
+ */
+message GetJournalStateRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message GetJournalStateResponseProto {
+  required uint64 lastPromisedEpoch = 1;
+  required uint32 httpPort = 2;
+}
+
+/**
+ * format()
+ */
+message FormatRequestProto {
+  required JournalIdProto jid = 1;
+  required NamespaceInfoProto nsInfo = 2;
+}
+
+message FormatResponseProto {
+}
+
+/**
+ * newEpoch()
+ */
+message NewEpochRequestProto {
+  required JournalIdProto jid = 1;
+  required NamespaceInfoProto nsInfo = 2;
+  required uint64 epoch = 3;
+}
+
+message NewEpochResponseProto {
+  optional uint64 lastSegmentTxId = 1;
+}
+
+/**
+ * getEditLogManifest()
+ */
+message GetEditLogManifestRequestProto {
+  required JournalIdProto jid = 1;
+  required uint64 sinceTxId = 2;  // Transaction ID
+}
+
+message GetEditLogManifestResponseProto {
+  required RemoteEditLogManifestProto manifest = 1; 
+  required uint32 httpPort = 2;
+
+  // TODO: we should add nsinfo somewhere
+  // to verify that it matches up with our expectation
+  // required NamespaceInfoProto nsInfo = 2;
+}
+
+/**
+ * prepareRecovery()
+ */
+message PrepareRecoveryRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 segmentTxId = 2;
+}
+
+message PrepareRecoveryResponseProto {
+  optional SegmentStateProto segmentState = 1;
+  optional uint64 acceptedInEpoch = 2;
+  required uint64 lastWriterEpoch = 3;
+
+  // The highest committed txid that this logger has ever seen.
+  // This may be higher than the data it actually has, in the case
+  // that it was lagging before the old writer crashed.
+  optional uint64 lastCommittedTxId = 4;
+}
+
+/**
+ * acceptRecovery()
+ */
+message AcceptRecoveryRequestProto {
+  required RequestInfoProto reqInfo = 1;
+
+  /** Details on the segment to recover */
+  required SegmentStateProto stateToAccept = 2;
+  
+  /** The URL from which the log may be copied */
+  required string fromURL = 3;
+}
+
+message AcceptRecoveryResponseProto {
+}
+
+
+/**
+ * Protocol used to journal edits to a JournalNode.
+ * See the request and response for details of rpc call.
+ */
+service QJournalProtocolService {
+  rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
+
+  rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
+
+  rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
+
+  rpc format(FormatRequestProto) returns (FormatResponseProto);
+
+  rpc journal(JournalRequestProto) returns (JournalResponseProto);
+
+  rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
+
+  rpc startLogSegment(StartLogSegmentRequestProto) 
+      returns (StartLogSegmentResponseProto);
+
+  rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
+      returns (FinalizeLogSegmentResponseProto);
+
+  rpc purgeLogs(PurgeLogsRequestProto)
+      returns (PurgeLogsResponseProto);
+
+  rpc getEditLogManifest(GetEditLogManifestRequestProto)
+      returns (GetEditLogManifestResponseProto);
+
+  rpc prepareRecovery(PrepareRecoveryRequestProto)
+      returns (PrepareRecoveryResponseProto);
+
+  rpc acceptRecovery(AcceptRecoveryRequestProto)
+      returns (AcceptRecoveryResponseProto);
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Wed Dec  5 19:22:17 2012
@@ -305,6 +305,7 @@ message BlocksWithLocationsProto {
 message RemoteEditLogProto {
   required uint64 startTxId = 1;  // Starting available edit log transaction
   required uint64 endTxId = 2;    // Ending available edit log transaction
+  optional bool isInProgress = 3 [default = false];
 }
 
 /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Dec  5 19:22:17 2012
@@ -260,6 +260,11 @@
 </property>
 
 <property>
+  <name>dfs.namenode.edits.journal-plugin.qjournal</name>
+  <value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
+</property>
+
+<property>
   <name>dfs.permissions.enabled</name>
   <value>true</value>
   <description>
@@ -1162,4 +1167,21 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.journalnode.rpc-address</name>
+  <value>0.0.0.0:8485</value>
+  <description>
+    The JournalNode RPC server address and port.
+  </description>
+</property>
+
+<property>
+  <name>dfs.journalnode.http-address</name>
+  <value>0.0.0.0:8480</value>
+  <description>
+    The address and port the JournalNode web UI listens on.
+    If the port is 0 then the server will start on a free port.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Wed Dec  5 19:22:17 2012
@@ -60,8 +60,10 @@
 <%= NamenodeJspHelper.getCorruptFilesWarning(fsn)%>
 
 <% healthjsp.generateHealthReport(out, nn, request); %>
-<hr>
+<% healthjsp.generateJournalReport(out, nn, request); %>
+<hr/>
 <% healthjsp.generateConfReport(out, nn, request); %>
+<hr>
 <%
 out.println(ServletUtil.htmlFooter());
 %>

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html Wed Dec  5 19:22:17 2012
@@ -0,0 +1,29 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=journalstatus.jsp"/>
+<html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul> 
+  <li><a href="journalstatus.jsp">Status</a></li> 
+</ul>
+
+</body> 
+</html>

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp Wed Dec  5 19:22:17 2012
@@ -0,0 +1,42 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="org.apache.hadoop.hdfs.server.common.JspHelper"
+  import="org.apache.hadoop.util.ServletUtil"
+%>
+<%!
+  //for java.io.Serializable
+  private static final long serialVersionUID = 1L;
+%>
+
+<!DOCTYPE html>
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop JournalNode</title>
+    
+<body>
+<h1>JournalNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml Wed Dec  5 19:22:17 2012
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee">
+@journal.servlet.definitions@
+</web-app>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Dec  5 19:22:17 2012
@@ -85,6 +85,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 
 /** Utilities for HDFS tests */
@@ -594,12 +595,21 @@ public class DFSTestUtil {
     IOUtils.copyBytes(is, os, s.length(), true);
   }
   
-  // Returns url content as string.
+  /**
+   * @return url content as string (UTF-8 encoding assumed)
+   */
   public static String urlGet(URL url) throws IOException {
+    return new String(urlGetBytes(url), Charsets.UTF_8);
+  }
+  
+  /**
+   * @return URL contents as a byte array
+   */
+  public static byte[] urlGetBytes(URL url) throws IOException {
     URLConnection conn = url.openConnection();
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
-    return out.toString();
+    return out.toByteArray();
   }
   
   /**

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class MiniJournalCluster {
+  public static class Builder {
+    private String baseDir;
+    private int numJournalNodes = 3;
+    private boolean format = true;
+    private Configuration conf;
+    
+    public Builder(Configuration conf) {
+      this.conf = conf;
+    }
+    
+    public Builder baseDir(String d) {
+      this.baseDir = d;
+      return this;
+    }
+    
+    public Builder numJournalNodes(int n) {
+      this.numJournalNodes = n;
+      return this;
+    }
+
+    public Builder format(boolean f) {
+      this.format = f;
+      return this;
+    }
+
+    public MiniJournalCluster build() throws IOException {
+      return new MiniJournalCluster(this);
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class);
+  private File baseDir;
+  private JournalNode nodes[];
+  private InetSocketAddress ipcAddrs[];
+  private InetSocketAddress httpAddrs[];
+  
+  private MiniJournalCluster(Builder b) throws IOException {
+    LOG.info("Starting MiniJournalCluster with " +
+        b.numJournalNodes + " journal nodes");
+    
+    if (b.baseDir != null) {
+      this.baseDir = new File(b.baseDir);
+    } else {
+      this.baseDir = new File(MiniDFSCluster.getBaseDirectory());
+    }
+    
+    nodes = new JournalNode[b.numJournalNodes];
+    ipcAddrs = new InetSocketAddress[b.numJournalNodes];
+    httpAddrs = new InetSocketAddress[b.numJournalNodes];
+    for (int i = 0; i < b.numJournalNodes; i++) {
+      if (b.format) {
+        File dir = getStorageDir(i);
+        LOG.debug("Fully deleting JN directory " + dir);
+        FileUtil.fullyDelete(dir);
+      }
+      nodes[i] = new JournalNode();
+      nodes[i].setConf(createConfForNode(b, i));
+      nodes[i].start();
+
+      ipcAddrs[i] = nodes[i].getBoundIpcAddress();
+      httpAddrs[i] = nodes[i].getBoundHttpAddress();
+    }
+  }
+
+  /**
+   * Set up the given Configuration object to point to the set of JournalNodes 
+   * in this cluster.
+   */
+  public URI getQuorumJournalURI(String jid) {
+    List<String> addrs = Lists.newArrayList();
+    for (InetSocketAddress addr : ipcAddrs) {
+      addrs.add("127.0.0.1:" + addr.getPort());
+    }
+    String addrsVal = Joiner.on(";").join(addrs);
+    LOG.debug("Setting logger addresses to: " + addrsVal);
+    try {
+      return new URI("qjournal://" + addrsVal + "/" + jid);
+    } catch (URISyntaxException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  /**
+   * Start the JournalNodes in the cluster.
+   */
+  public void start() throws IOException {
+    for (JournalNode jn : nodes) {
+      jn.start();
+    }
+  }
+
+  /**
+   * Shutdown all of the JournalNodes in the cluster.
+   * @throws IOException if one or more nodes failed to stop
+   */
+  public void shutdown() throws IOException {
+    boolean failed = false;
+    for (JournalNode jn : nodes) {
+      try {
+        jn.stopAndJoin(0);
+      } catch (Exception e) {
+        failed = true;
+        LOG.warn("Unable to stop journal node " + jn, e);
+      }
+    }
+    if (failed) {
+      throw new IOException("Unable to shut down. Check log for details");
+    }
+  }
+
+  private Configuration createConfForNode(Builder b, int idx) {
+    Configuration conf = new Configuration(b.conf);
+    File logDir = getStorageDir(idx);
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    return conf;
+  }
+
+  public File getStorageDir(int idx) {
+    return new File(baseDir, "journalnode-" + idx).getAbsoluteFile();
+  }
+  
+  public File getCurrentDir(int idx, String jid) {
+    return new File(new File(getStorageDir(idx), jid), "current");
+  }
+
+  public JournalNode getJournalNode(int i) {
+    return nodes[i];
+  }
+  
+  public void restartJournalNode(int i) throws InterruptedException, IOException {
+    Configuration conf = new Configuration(nodes[i].getConf());
+    if (nodes[i].isStarted()) {
+      nodes[i].stopAndJoin(0);
+    }
+    
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" +
+        ipcAddrs[i].getPort());
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" +
+        httpAddrs[i].getPort());
+    
+    JournalNode jn = new JournalNode();
+    jn.setConf(conf);
+    jn.start();
+  }
+
+  public int getQuorumSize() {
+    return nodes.length / 2 + 1;
+  }
+
+  public int getNumNodes() {
+    return nodes.length;
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.collect.Lists;
+
+public abstract class QJMTestUtil {
+  public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+  public static final String JID = "test-journal";
+
+  public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
+    DataOutputBuffer buf = new DataOutputBuffer();
+    FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf);
+    
+    for (long txid = startTxn; txid < startTxn + numTxns; txid++) {
+      FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+      op.setTransactionId(txid);
+      writer.writeOp(op);
+    }
+    
+    return Arrays.copyOf(buf.getData(), buf.getLength());
+  }
+  
+  public static EditLogOutputStream writeSegment(MiniJournalCluster cluster,
+      QuorumJournalManager qjm, long startTxId, int numTxns,
+      boolean finalize) throws IOException {
+    EditLogOutputStream stm = qjm.startLogSegment(startTxId);
+    // Should create in-progress
+    assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(startTxId));
+    
+    writeTxns(stm, startTxId, numTxns);
+    if (finalize) {
+      stm.close();
+      qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
+      return null;
+    } else {
+      return stm;
+    }
+  }
+
+  public static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
+    FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+    op.setTransactionId(txid);
+    stm.write(op);
+  }
+
+  public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns)
+      throws IOException {
+    for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
+      writeOp(stm, txid);
+    }
+    stm.setReadyToFlush();
+    stm.flush();
+  }
+  
+  /**
+   * Verify that the given list of streams contains exactly the range of
+   * transactions specified, inclusive.
+   */
+  public static void verifyEdits(List<EditLogInputStream> streams,
+      int firstTxnId, int lastTxnId) throws IOException {
+    
+    Iterator<EditLogInputStream> iter = streams.iterator();
+    assertTrue(iter.hasNext());
+    EditLogInputStream stream = iter.next();
+    
+    for (int expected = firstTxnId;
+        expected <= lastTxnId;
+        expected++) {
+      
+      FSEditLogOp op = stream.readOp();
+      while (op == null) {
+        assertTrue("Expected to find txid " + expected + ", " +
+            "but no more streams available to read from",
+            iter.hasNext());
+        stream = iter.next();
+        op = stream.readOp();
+      }
+      
+      assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+      assertEquals(expected, op.getTransactionId());
+    }
+    
+    assertNull(stream.readOp());
+    assertFalse("Expected no more txns after " + lastTxnId +
+        " but more streams are available", iter.hasNext());
+  }
+  
+
+  public static void assertExistsInQuorum(MiniJournalCluster cluster,
+      String fname) {
+    int count = 0;
+    for (int i = 0; i < 3; i++) {
+      File dir = cluster.getCurrentDir(i, JID);
+      if (new File(dir, fname).exists()) {
+        count++;
+      }
+    }
+    assertTrue("File " + fname + " should exist in a quorum of dirs",
+        count >= cluster.getQuorumSize());
+  }
+
+  public static long recoverAndReturnLastTxn(QuorumJournalManager qjm)
+      throws IOException {
+    qjm.recoverUnfinalizedSegments();
+    long lastRecoveredTxn = 0;
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    try {
+      qjm.selectInputStreams(streams, 0, false);
+      
+      for (EditLogInputStream elis : streams) {
+        assertTrue(elis.getFirstTxId() > lastRecoveredTxn);
+        lastRecoveredTxn = elis.getLastTxId();
+      }
+    } finally {
+      IOUtils.cleanup(null, streams.toArray(new Closeable[0]));
+    }
+    return lastRecoveredTxn;
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.junit.Test;
+
+
+public class TestMiniJournalCluster {
+  @Test
+  public void testStartStop() throws IOException {
+    Configuration conf = new Configuration();
+    MiniJournalCluster c = new MiniJournalCluster.Builder(conf)
+      .build();
+    try {
+      URI uri = c.getQuorumJournalURI("myjournal");
+      String[] addrs = uri.getAuthority().split(";");
+      assertEquals(3, addrs.length);
+      
+      JournalNode node = c.getJournalNode(0);
+      String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
+      assertEquals(
+          new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0")
+            .getAbsolutePath(),
+          dir);
+    } finally {
+      c.shutdown();
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ExitUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNNWithQJM {
+  Configuration conf = new HdfsConfiguration();
+  private MiniJournalCluster mjc;
+  private Path TEST_PATH = new Path("/test-dir");
+  private Path TEST_PATH_2 = new Path("/test-dir");
+
+  @Before
+  public void resetSystemExit() {
+    ExitUtil.resetFirstExitException();
+  }
+  
+  @Before
+  public void startJNs() throws Exception {
+    mjc = new MiniJournalCluster.Builder(conf).build();
+  }
+  
+  @After
+  public void stopJNs() throws Exception {
+    if (mjc != null) {
+      mjc.shutdown();
+    }
+  }
+  
+  @Test
+  public void testLogAndRestart() throws IOException {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .build();
+    try {
+      cluster.getFileSystem().mkdirs(TEST_PATH);
+      
+      // Restart the NN and make sure the edit was persisted
+      // and loaded again
+      cluster.restartNameNode();
+      
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+      cluster.getFileSystem().mkdirs(TEST_PATH_2);
+      
+      // Restart the NN again and make sure both edits are persisted.
+      cluster.restartNameNode();
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH_2));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testNewNamenodeTakesOverWriter() throws Exception {
+    File nn1Dir = new File(
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn1");
+    File nn2Dir = new File(
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn2");
+    
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        nn1Dir.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .checkExitOnShutdown(false)
+      .build();
+
+    try {
+      cluster.getFileSystem().mkdirs(TEST_PATH);
+      
+      // Start a second NN pointed to the same quorum.
+      // We need to copy the image dir from the first NN -- or else
+      // the new NN will just be rejected because of Namespace mismatch.
+      FileUtil.fullyDelete(nn2Dir);
+      FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
+          new Path(nn2Dir.getAbsolutePath()), false, conf);
+      
+      Configuration conf2 = new Configuration();
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          nn2Dir.getAbsolutePath());
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+          mjc.getQuorumJournalURI("myjournal").toString());
+      MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2)
+        .numDataNodes(0)
+        .format(false)
+        .manageNameDfsDirs(false)
+        .build();
+      
+      // Check that the new cluster sees the edits made on the old cluster
+      try {
+        assertTrue(cluster2.getFileSystem().exists(TEST_PATH));
+      } finally {
+        cluster2.shutdown();
+      }
+      
+      // Check that, if we try to write to the old NN
+      // that it aborts.
+      try {
+        cluster.getFileSystem().mkdirs(new Path("/x"));
+        fail("Did not abort trying to write to a fenced NN");
+      } catch (RemoteException re) {
+        GenericTestUtils.assertExceptionContains(
+            "Could not sync enough journals to persistent storage", re);
+      }
+    } finally {
+      //cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testMismatchedNNIsRejected() throws Exception {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    // Start a NN, so the storage is formatted -- both on-disk
+    // and QJM.
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .build();
+    cluster.shutdown();
+    
+    // Reformat just the on-disk portion
+    Configuration onDiskOnly = new Configuration(conf);
+    onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
+    NameNode.format(onDiskOnly);
+
+    // Start the NN - should fail because the JNs are still formatted
+    // with the old namespace ID.
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .manageNameDfsDirs(false)
+        .format(false)
+        .build();
+      fail("New NN with different namespace should have been rejected");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Unable to start log segment 1: too few journals", ioe);
+    }
+  }
+  
+  @Test
+  public void testWebPageHasQjmInfo() throws Exception {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    // Speed up the test
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .build();
+    try {
+      URL url = new URL("http://localhost:"
+          + NameNode.getHttpAddress(cluster.getConfiguration(0)).getPort()
+          + "/dfshealth.jsp");
+      
+      cluster.getFileSystem().mkdirs(TEST_PATH);
+      
+      String contents = DFSTestUtil.urlGet(url); 
+      assertTrue(contents.contains("QJM to ["));
+      assertTrue(contents.contains("Written txid 2"));
+
+      // Stop one JN, do another txn, and make sure it shows as behind
+      // stuck behind the others.
+      mjc.getJournalNode(0).stopAndJoin(0);
+      
+      cluster.getFileSystem().delete(TEST_PATH, true);
+      
+      contents = DFSTestUtil.urlGet(url); 
+      System.out.println(contents);
+      assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
+          .find());
+
+      // Restart NN while JN0 is still down.
+      cluster.restartNameNode();
+
+      contents = DFSTestUtil.urlGet(url); 
+      System.out.println(contents);
+      assertTrue(Pattern.compile("never written").matcher(contents)
+          .find());
+      
+
+    } finally {
+      cluster.shutdown();
+    }
+
+  }
+}