You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 02:48:12 UTC

svn commit: r1373177 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/ src/main/java/org/apache/hadoop/hdfs/qjournal/...

Author: todd
Date: Wed Aug 15 00:48:11 2012
New Revision: 1373177

URL: http://svn.apache.org/viewvc?rev=1373177&view=rev
Log:
HDFS-3793. Implement genericized format() in QJM. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 00:48:11 2012
@@ -14,3 +14,5 @@ HDFS-3725. Fix QJM startup when individu
 HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)
 
 HDFS-3773. TestNNWithQJM fails after HDFS-3741. (atm)
+
+HDFS-3793. Implement genericized format() in QJM (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Wed Aug 15 00:48:11 2012
@@ -83,6 +83,12 @@ interface AsyncLogger {
   public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
 
   /**
+   * Format the log directory.
+   * @param nsInfo the namespace info to format with
+   */
+  public ListenableFuture<Void> format(NamespaceInfo nsInfo);
+
+  /**
    * @return the state of the last epoch on the target node.
    */
   public ListenableFuture<GetJournalStateResponseProto> getJournalState();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Wed Aug 15 00:48:11 2012
@@ -25,18 +25,23 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Wrapper around a set of Loggers, taking care of fanning out
@@ -197,6 +202,36 @@ class AsyncLoggerSet {
     }
     return QuorumCall.create(calls);    
   }
+  
+  public QuorumCall<AsyncLogger, Boolean> isFormatted() {
+    Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      final SettableFuture<Boolean> ret = SettableFuture.create();
+      ListenableFuture<GetJournalStateResponseProto> jstate =
+          logger.getJournalState();
+      Futures.addCallback(jstate, new FutureCallback<GetJournalStateResponseProto>() {
+        @Override
+        public void onFailure(Throwable t) {
+          if (t instanceof RemoteException) {
+            t = ((RemoteException)t).unwrapRemoteException();
+          }
+          if (t instanceof JournalNotFormattedException) {
+            ret.set(false);
+          } else {
+            ret.setException(t);
+          }
+        }
+
+        @Override
+        public void onSuccess(GetJournalStateResponseProto jstate) {
+          ret.set(true);
+        }
+      });
+      
+      calls.put(logger, ret);
+    }
+    return QuorumCall.create(calls);
+  }
 
   private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
       NamespaceInfo nsInfo,
@@ -275,4 +310,15 @@ class AsyncLoggerSet {
     }
     return QuorumCall.create(calls);
   }
+
+  QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.format(nsInfo);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Aug 15 00:48:11 2012
@@ -281,6 +281,17 @@ public class IPCLoggerChannel implements
   }
 
   @Override
+  public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        getProxy().format(journalId, nsInfo);
+        return null;
+      }
+    });
+  }
+  
+  @Override
   public ListenableFuture<Void> startLogSegment(final long txid) {
     return executor.submit(new Callable<Void>() {
       @Override

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Aug 15 00:48:11 2012
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.PriorityQueue;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,6 +69,12 @@ public class QuorumJournalManager implem
   private final int acceptRecoveryTimeoutMs;
   private final int finalizeSegmentTimeoutMs;
   private final int selectInputStreamsTimeoutMs;
+
+  // Since these don't occur during normal operation, we can
+  // use rather lengthy timeouts, and don't need to make them
+  // configurable.
+  private static final int FORMAT_TIMEOUT_MS = 60000;
+  private static final int HASDATA_TIMEOUT_MS = 60000;
   
   private final Configuration conf;
   private final URI uri;
@@ -133,6 +140,52 @@ public class QuorumJournalManager implem
         "bad journal id: " + jid);
   }
 
+  @Override
+  public void format(NamespaceInfo nsInfo) throws IOException {
+    QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for format() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for format() response");
+    }
+    
+    if (call.countExceptions() > 0) {
+      call.rethrowException("Could not format one or more JournalNodes");
+    }
+  }
+
+  @Override
+  public boolean hasSomeData() throws IOException {
+    QuorumCall<AsyncLogger, Boolean> call =
+        loggers.isFormatted();
+
+    try {
+      call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while determining if JNs have data");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for response from loggers");
+    }
+    
+    if (call.countExceptions() > 0) {
+      call.rethrowException(
+          "Unable to check if JNs are ready for formatting");
+    }
+    
+    // If any of the loggers returned with a non-empty manifest, then
+    // we should prompt for format.
+    for (Boolean hasData : call.getResults().values()) {
+      if (hasData) {
+        return true;
+      }
+    }
+
+    // Otherwise, none were formatted, we can safely format.
+    return false;
+  }
+
   /**
    * Run recovery/synchronization for a specific segment.
    * Postconditions:
@@ -278,7 +331,7 @@ public class QuorumJournalManager implem
     }
     return addrs;
   }
-
+  
   @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
     Preconditions.checkState(isActiveWriter,

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java?rev=1373177&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java Wed Aug 15 00:48:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+
+/**
+ * Exception indicating that a call has been made to a JournalNode
+ * which is not yet formatted.
+ */
+@InterfaceAudience.Private
+public class JournalNotFormattedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  
+  public JournalNotFormattedException(String msg) {
+    super(msg);
+  }
+
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Wed Aug 15 00:48:11 2012
@@ -55,6 +55,12 @@ public interface QJournalProtocol {
       throws IOException;
   
   /**
+   * Format the underlying storage for the given namespace.
+   */
+  public void format(String journalId,
+      NamespaceInfo nsInfo) throws IOException;
+
+  /**
    * Begin a new epoch. See the HDFS-3077 design doc for details.
    */
   public NewEpochResponseProto newEpoch(String journalId,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Wed Aug 15 00:48:11 2012
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -90,6 +92,17 @@ public class QJournalProtocolServerSideT
     }
   }
 
+  public FormatResponseProto format(RpcController controller,
+      FormatRequestProto request) throws ServiceException {
+    try {
+      impl.format(request.getJid().getIdentifier(),
+          PBHelper.convert(request.getNsInfo()));
+      return FormatResponseProto.getDefaultInstance();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+  
   /** @see JournalProtocol#journal */
   @Override
   public JournalResponseProto journal(RpcController unused,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Wed Aug 15 00:48:11 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -93,6 +94,19 @@ public class QJournalProtocolTranslatorP
         .setIdentifier(jid)
         .build();
   }
+  
+  @Override
+  public void format(String jid, NamespaceInfo nsInfo) throws IOException {
+    try {
+      FormatRequestProto req = FormatRequestProto.newBuilder()
+          .setJid(convertJournalId(jid))
+          .setNsInfo(PBHelper.convert(nsInfo))
+          .build();
+      rpcProxy.format(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Wed Aug 15 00:48:11 2012
@@ -105,37 +105,28 @@ class JNStorage extends Storage {
     setStorageInfo(nsInfo);
     LOG.info("Formatting journal storage directory " + 
         sd + " with nsid: " + getNamespaceID());
+    // Unlock the directory before formatting, because we will
+    // re-analyze it after format(). The analyzeStorage() call
+    // below is reponsible for re-locking it. This is a no-op
+    // if the storage is not currently locked.
+    unlockAll();
     sd.clearDirectory();
     writeProperties(sd);
     if (!getPaxosDir().mkdirs()) {
       throw new IOException("Could not create paxos dir: " + getPaxosDir());
     }
-  }
-  
-  public void formatIfNecessary(NamespaceInfo nsInfo) throws IOException {
-    if (state == StorageState.NOT_FORMATTED ||
-        state == StorageState.NON_EXISTENT) {
-      format(nsInfo);
-      analyzeStorage();
-      assert state == StorageState.NORMAL :
-        "Unexpected state after formatting: " + state;
-    } else {
-      Preconditions.checkState(state == StorageState.NORMAL,
-          "Unhandled storage state in %s: %s", this, state);
-      assert getNamespaceID() != 0;
-      
-      checkConsistentNamespace(nsInfo);
-    }
+    analyzeStorage();
   }
 
-  private void analyzeStorage() throws IOException {
+  
+  void analyzeStorage() throws IOException {
     this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
     if (state == StorageState.NORMAL) {
       readProperties(sd);
     }
   }
 
-  private void checkConsistentNamespace(NamespaceInfo nsInfo)
+  void checkConsistentNamespace(NamespaceInfo nsInfo)
       throws IOException {
     if (nsInfo.getNamespaceID() != getNamespaceID()) {
       throw new IOException("Incompatible namespaceID for journal " +
@@ -155,4 +146,8 @@ class JNStorage extends Storage {
     LOG.info("Closing journal storage for " + sd);
     unlockAll();
   }
+
+  public boolean isFormatted() {
+    return state == StorageState.NORMAL;
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Aug 15 00:48:11 2012
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
@@ -114,6 +115,8 @@ class Journal implements Closeable {
     Preconditions.checkState(nsInfo.getNamespaceID() != 0,
         "can't format with uninitialized namespace info: %s",
         nsInfo);
+    LOG.info("Formatting " + this + " with namespace info: " +
+        nsInfo);
     storage.format(nsInfo);
   }
 
@@ -134,6 +137,7 @@ class Journal implements Closeable {
    * any lower epoch, or 0 if no promises have been made.
    */
   synchronized long getLastPromisedEpoch() throws IOException {
+    checkFormatted();
     return lastPromisedEpoch.get();
   }
 
@@ -150,9 +154,8 @@ class Journal implements Closeable {
   synchronized NewEpochResponseProto newEpoch(
       NamespaceInfo nsInfo, long epoch) throws IOException {
 
-    // If the storage is unformatted, format it with this NS.
-    // Otherwise, check that the NN's nsinfo matches the storage.
-    storage.formatIfNecessary(nsInfo);
+    checkFormatted();
+    storage.checkConsistentNamespace(nsInfo);
     
     if (epoch <= getLastPromisedEpoch()) {
       throw new IOException("Proposed epoch " + epoch + " <= last promise " +
@@ -185,6 +188,7 @@ class Journal implements Closeable {
   synchronized void journal(RequestInfo reqInfo, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
     checkRequest(reqInfo);
+    checkFormatted();
     
     // TODO: if a JN goes down and comes back up, then it will throw
     // this exception on every edit. We should instead send back
@@ -226,6 +230,13 @@ class Journal implements Closeable {
     // TODO: some check on serial number that they only increase from a given
     // client
   }
+  
+  private void checkFormatted() throws JournalNotFormattedException {
+    if (!storage.isFormatted()) {
+      throw new JournalNotFormattedException("Journal " + storage +
+          " not formatted");
+    }
+  }
 
   /**
    * Start a new segment at the given txid. The previous segment
@@ -235,6 +246,7 @@ class Journal implements Closeable {
       throws IOException {
     assert fjm != null;
     checkRequest(reqInfo);
+    checkFormatted();
     
     Preconditions.checkState(curSegment == null,
         "Can't start a log segment, already writing " + curSegment);
@@ -251,6 +263,7 @@ class Journal implements Closeable {
   public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
       long endTxId) throws IOException {
     checkRequest(reqInfo);
+    checkFormatted();
 
     if (startTxId == curSegmentTxId) {
       if (curSegment != null) {
@@ -284,6 +297,7 @@ class Journal implements Closeable {
   public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
       long minTxIdToKeep) throws IOException {
     checkRequest(reqInfo);
+    checkFormatted();
     
     fjm.purgeLogsOlderThan(minTxIdToKeep);
     purgePaxosDecisionsOlderThan(minTxIdToKeep);
@@ -320,6 +334,8 @@ class Journal implements Closeable {
       throws IOException {
     // No need to checkRequest() here - anyone may ask for the list
     // of segments.
+    checkFormatted();
+    
     RemoteEditLogManifest manifest = new RemoteEditLogManifest(
         fjm.getRemoteEditLogs(sinceTxId));
     return manifest;
@@ -360,6 +376,7 @@ class Journal implements Closeable {
   public synchronized PrepareRecoveryResponseProto prepareRecovery(
       RequestInfo reqInfo, long segmentTxId) throws IOException {
     checkRequest(reqInfo);
+    checkFormatted();
     
     PrepareRecoveryResponseProto.Builder builder =
         PrepareRecoveryResponseProto.newBuilder();
@@ -388,6 +405,7 @@ class Journal implements Closeable {
       SegmentStateProto segment, URL fromUrl)
       throws IOException {
     checkRequest(reqInfo);
+    checkFormatted();
     long segmentTxId = segment.getStartTxId();
 
     // TODO: right now, a recovery of a segment when the log is

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Aug 15 00:48:11 2012
@@ -108,6 +108,11 @@ class JournalNodeRpcServer implements QJ
     return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
   }
 
+  @Override
+  public void format(String journalId, NamespaceInfo nsInfo)
+      throws IOException {
+    jn.getOrCreateJournal(journalId).format(nsInfo);
+  }
 
   @Override
   public void journal(RequestInfo reqInfo, long firstTxnId,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Wed Aug 15 00:48:11 2012
@@ -110,6 +110,17 @@ message GetJournalStateResponseProto {
 }
 
 /**
+ * format()
+ */
+message FormatRequestProto {
+  required JournalIdProto jid = 1;
+  required NamespaceInfoProto nsInfo = 2;
+}
+
+message FormatResponseProto {
+}
+
+/**
  * newEpoch()
  */
 message NewEpochRequestProto {
@@ -178,6 +189,8 @@ service QJournalProtocolService {
 
   rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
 
+  rpc format(FormatRequestProto) returns (FormatResponseProto);
+
   rpc journal(JournalRequestProto) returns (JournalResponseProto);
 
   rpc startLogSegment(StartLogSegmentRequestProto) 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Wed Aug 15 00:48:11 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ExitUtil;
@@ -157,19 +158,26 @@ public class TestNNWithQJM {
     conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         mjc.getQuorumJournalURI("myjournal").toString());
     
-    // Start a NN, so the storage is formatted with its namespace info. 
+    // Start a NN, so the storage is formatted -- both on-disk
+    // and QJM.
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(0)
       .manageNameDfsDirs(false)
       .build();
     cluster.shutdown();
     
-    // Create a new (freshly-formatted) NN, which should not be able to
-    // reuse the same journal, since its journal ID would not match.
+    // Reformat just the on-disk portion
+    Configuration onDiskOnly = new Configuration(conf);
+    onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
+    NameNode.format(onDiskOnly);
+
+    // Start the NN - should fail because the JNs are still formatted
+    // with the old namespace ID.
     try {
       cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(0)
         .manageNameDfsDirs(false)
+        .format(false)
         .build();
       fail("New NN with different namespace should have been rejected");
     } catch (IOException ioe) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Wed Aug 15 00:48:11 2012
@@ -54,6 +54,10 @@ public class TestEpochsAreUnique {
     Configuration conf = new Configuration();
     MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
     URI uri = cluster.getQuorumJournalURI(JID);
+    QuorumJournalManager qjm = new QuorumJournalManager(
+        conf, uri, FAKE_NSINFO);
+    qjm.format(FAKE_NSINFO);
+    
     try {
       // With no failures or contention, epochs should increase one-by-one
       for (int i = 0; i < 5; i++) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Aug 15 00:48:11 2012
@@ -76,6 +76,7 @@ public class TestQJMWithFaults {
     MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
     try {
       QuorumJournalManager qjm = createInjectableQJM(cluster);
+      qjm.format(FAKE_NSINFO);
       doWorkload(cluster, qjm);
       
       SortedSet<Integer> ipcCounts = Sets.newTreeSet();
@@ -118,6 +119,7 @@ public class TestQJMWithFaults {
         try {
           QuorumJournalManager qjm;
           qjm = createInjectableQJM(cluster);
+          qjm.format(FAKE_NSINFO);
           List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
           failIpcNumber(loggers.get(0), failA);
           failIpcNumber(loggers.get(1), failB);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Aug 15 00:48:11 2012
@@ -84,6 +84,7 @@ public class TestQuorumJournalManager {
     qjm = createSpyingQJM();
     spies = qjm.getLoggerSetForTests().getLoggersForTests();
 
+    qjm.format(QJMTestUtil.FAKE_NSINFO);
     qjm.recoverUnfinalizedSegments();
     assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
   }
@@ -110,6 +111,15 @@ public class TestQuorumJournalManager {
   }
   
   @Test
+  public void testFormat() throws Exception {
+    QuorumJournalManager qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
+    assertFalse(qjm.hasSomeData());
+    qjm.format(FAKE_NSINFO);
+    assertTrue(qjm.hasSomeData());
+  }
+  
+  @Test
   public void testReaderWhileAnotherWrites() throws Exception {
     
     QuorumJournalManager readerQjm = createSpyingQJM();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Wed Aug 15 00:48:11 2012
@@ -85,6 +85,8 @@ public class TestQuorumJournalManagerUni
       futureReturns(
           NewEpochResponseProto.newBuilder().build()
           ).when(logger).newEpoch(Mockito.anyLong());
+      
+      futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
     }
     
     qjm.recoverUnfinalizedSegments();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1373177&r1=1373176&r2=1373177&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Aug 15 00:48:11 2012
@@ -61,6 +61,7 @@ public class TestJournal {
   public void setup() throws Exception {
     FileUtil.fullyDelete(TEST_LOG_DIR);
     journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    journal.format(FAKE_NSINFO);
   }
   
   @After
@@ -130,18 +131,14 @@ public class TestJournal {
   
   @Test
   public void testJournalLocking() throws Exception {
+    Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
     StorageDirectory sd = journal.getStorage().getStorageDir(0);
     File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
-
-    // Journal should not be locked, since we lazily initialize it.
-    assertFalse(lockFile.exists());
-
-    journal.newEpoch(FAKE_NSINFO,  1);
-    Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
     
-    // Journal should be locked
+    // Journal should be locked, since the format() call locks it.
     GenericTestUtils.assertExists(lockFile);
-    
+
+    journal.newEpoch(FAKE_NSINFO,  1);
     try {
       new Journal(TEST_LOG_DIR, mockErrorReporter);
       fail("Did not fail to create another journal in same dir");
@@ -153,6 +150,7 @@ public class TestJournal {
     journal.close();
     
     // Journal should no longer be locked after the close() call.
+    // Hence, should be able to create a new Journal in the same dir.
     Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
     journal2.newEpoch(FAKE_NSINFO, 2);
   }