You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/07 02:52:13 UTC

svn commit: r1310649 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/journalservice/ src/main/java...

Author: szetszwo
Date: Sat Apr  7 00:52:12 2012
New Revision: 1310649

URL: http://svn.apache.org/viewvc?rev=1310649&view=rev
Log:
HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol.  Contributed by suresh 

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Apr  7 00:52:12 2012
@@ -62,9 +62,6 @@ Trunk (unreleased changes)
     HDFS-3178. Add states and state handler for journal synchronization in
     JournalService.  (szetszwo)
 
-    HDFS-3204. Minor modification to JournalProtocol.proto to make
-    it generic. (suresh)
-
   OPTIMIZATIONS
 
     HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
@@ -335,6 +332,15 @@ Release 2.0.0 - UNRELEASED 
 
     HDFS-3226. Allow GetConf tool to print arbitrary keys (todd)
 
+    HDFS-3204. Minor modification to JournalProtocol.proto to make
+    it generic. (suresh)
+
+    HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS.  (Ravi
+    Prakash via szetszwo)
+
+    HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
+    and epoch in JournalProtocol. (suresh via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@@ -771,9 +777,6 @@ Release 0.23.3 - UNRELEASED
 
   IMPROVEMENTS
 
-    HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS.  (Ravi
-    Prakash via szetszwo)
-
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java Sat Apr  7 00:52:12 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 
 /**
@@ -33,6 +34,10 @@ import org.apache.hadoop.hdfs.server.pro
 public class UnregisteredNodeException extends IOException {
   private static final long serialVersionUID = -5620209396945970810L;
 
+  public UnregisteredNodeException(JournalInfo info) {
+    super("Unregistered server: " + info.toString());
+  }
+  
   public UnregisteredNodeException(NodeRegistration nodeReg) {
     super("Unregistered server: " + nodeReg.toString());
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java Sat Apr  7 00:52:12 2012
@@ -20,10 +20,13 @@ package org.apache.hadoop.hdfs.protocolP
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 
 import com.google.protobuf.RpcController;
@@ -48,9 +51,8 @@ public class JournalProtocolServerSideTr
   public JournalResponseProto journal(RpcController unused,
       JournalRequestProto req) throws ServiceException {
     try {
-      impl.journal(PBHelper.convert(req.getJournalInfo()),
-          req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
-              .toByteArray());
+      impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
+          req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -63,10 +65,24 @@ public class JournalProtocolServerSideTr
       StartLogSegmentRequestProto req) throws ServiceException {
     try {
       impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
-          req.getTxid());
+          req.getEpoch(), req.getTxid());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
     return StartLogSegmentResponseProto.newBuilder().build();
   }
+
+  @Override
+  public FenceResponseProto fence(RpcController controller,
+      FenceRequestProto req) throws ServiceException {
+    try {
+      FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
+          req.getFencerInfo());
+      return FenceResponseProto.newBuilder().setInSync(resp.isInSync())
+          .setLastTransactionId(resp.getLastTransactionId())
+          .setPreviousEpoch(resp.getPreviousEpoch()).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java Sat Apr  7 00:52:12 2012
@@ -22,10 +22,13 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.RPC;
@@ -58,10 +61,11 @@ public class JournalProtocolTranslatorPB
   }
 
   @Override
-  public void journal(NamenodeRegistration reg, long firstTxnId,
+  public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
     JournalRequestProto req = JournalRequestProto.newBuilder()
-        .setJournalInfo(PBHelper.convertToJournalInfo(reg))
+        .setJournalInfo(PBHelper.convert(journalInfo))
+        .setEpoch(epoch)
         .setFirstTxnId(firstTxnId)
         .setNumTxns(numTxns)
         .setRecords(PBHelper.getByteString(records))
@@ -74,10 +78,11 @@ public class JournalProtocolTranslatorPB
   }
 
   @Override
-  public void startLogSegment(NamenodeRegistration registration, long txid)
+  public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
       throws IOException {
     StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
-        .setJournalInfo(PBHelper.convertToJournalInfo(registration))
+        .setJournalInfo(PBHelper.convert(journalInfo))
+        .setEpoch(epoch)
         .setTxid(txid)
         .build();
     try {
@@ -86,6 +91,20 @@ public class JournalProtocolTranslatorPB
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException {
+    FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
+        .setJournalInfo(PBHelper.convert(journalInfo)).build();
+    try {
+      FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
+      return new FenceResponse(resp.getPreviousEpoch(),
+          resp.getLastTransactionId(), resp.getInSync());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public boolean isMethodSupported(String methodName) throws IOException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Sat Apr  7 00:52:12 2012
@@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
@@ -1347,25 +1348,19 @@ public class PBHelper {
         .setStorageID(r.getStorageID()).build();
   }
 
-  public static NamenodeRegistration convert(JournalInfoProto info) {
+  public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
-    StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0);
-    
-    // Note that the role is always {@link NamenodeRole#NAMENODE} as this
-    // conversion happens for messages from Namenode to Journal receivers.
-    // Addresses in the registration are unused.
-    return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
+    return new JournalInfo(lv, info.getClusterID(), nsID);
   }
 
   /**
    * Method used for converting {@link JournalInfoProto} sent from Namenode
    * to Journal receivers to {@link NamenodeRegistration}.
    */
-  public static JournalInfoProto convertToJournalInfo(
-      NamenodeRegistration reg) {
-    return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID())
-        .setLayoutVersion(reg.getLayoutVersion())
-        .setNamespaceID(reg.getNamespaceID()).build();
+  public static JournalInfoProto convert(JournalInfo j) {
+    return JournalInfoProto.newBuilder().setClusterID(j.getClusterId())
+        .setLayoutVersion(j.getLayoutVersion())
+        .setNamespaceID(j.getNamespaceId()).build();
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Sat Apr  7 00:52:12 2012
@@ -31,6 +31,9 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.FencedException;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -40,6 +43,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
 /**
@@ -66,6 +70,8 @@ public class JournalService implements J
   private final NamenodeProtocol namenode;
   private final StateHandler stateHandler = new StateHandler();
   private final RPC.Server rpcServer;
+  private long epoch = 0;
+  private String fencerInfo;
   
   enum State {
     /** The service is initialized and ready to start. */
@@ -115,7 +121,7 @@ public class JournalService implements J
       current = State.WAITING_FOR_ROLL;
     }
 
-    synchronized void startLogSegment() throws IOException {
+    synchronized void startLogSegment() {
       if (current == State.WAITING_FOR_ROLL) {
         current = State.SYNCING;
       }
@@ -232,28 +238,42 @@ public class JournalService implements J
   }
 
   @Override
-  public void journal(NamenodeRegistration registration, long firstTxnId,
+  public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Received journal " + firstTxnId + " " + numTxns);
     }
     stateHandler.isJournalAllowed();
-    verify(registration);
+    verify(epoch, journalInfo);
     listener.journal(this, firstTxnId, numTxns, records);
   }
 
   @Override
-  public void startLogSegment(NamenodeRegistration registration, long txid)
+  public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
       throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Received startLogSegment " + txid);
     }
     stateHandler.isStartLogSegmentAllowed();
-    verify(registration);
+    verify(epoch, journalInfo);
     listener.rollLogs(this, txid);
     stateHandler.startLogSegment();
   }
 
+  @Override
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException {
+    LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+    verifyFence(epoch, fencerInfo);
+    verify(journalInfo);
+    long previousEpoch = epoch;
+    this.epoch = epoch;
+    this.fencerInfo = fencerInfo;
+    
+    // TODO:HDFS-3092 set lastTransId and inSync
+    return new FenceResponse(previousEpoch, 0, false);
+  }
+
   /** Create an RPC server. */
   private static RPC.Server createRpcServer(Configuration conf,
       InetSocketAddress address, JournalProtocol impl) throws IOException {
@@ -267,15 +287,54 @@ public class JournalService implements J
         address.getHostName(), address.getPort(), 1, false, conf, null);
   }
   
-  private void verify(NamenodeRegistration reg) throws IOException {
-    if (!registration.getRegistrationID().equals(reg.getRegistrationID())) {
-      LOG.warn("Invalid registrationID - expected: "
-          + registration.getRegistrationID() + " received: "
-          + reg.getRegistrationID());
-      throw new UnregisteredNodeException(reg);
+  private void verifyEpoch(long e) throws FencedException {
+    if (epoch != e) {
+      String errorMsg = "Epoch " + e + " is not valid. "
+          + "Resource has already been fenced by " + fencerInfo
+          + " with epoch " + epoch;
+      LOG.warn(errorMsg);
+      throw new FencedException(errorMsg);
+    }
+  }
+  
+  private void verifyFence(long e, String fencer) throws FencedException {
+    if (e <= epoch) {
+      String errorMsg = "Epoch " + e + " from fencer " + fencer
+          + " is not valid. " + "Resource has already been fenced by "
+          + fencerInfo + " with epoch " + epoch;
+      LOG.warn(errorMsg);
+      throw new FencedException(errorMsg);
+    }
+  }
+  
+  /** 
+   * Verifies a journal request
+   */
+  private void verify(JournalInfo journalInfo) throws IOException {
+    String errorMsg = null;
+    int expectedNamespaceID = registration.getNamespaceID();
+    if (journalInfo.getNamespaceId() != expectedNamespaceID) {
+      errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
+          + " actual " + journalInfo.getNamespaceId();
+      LOG.warn(errorMsg);
+      throw new UnregisteredNodeException(journalInfo);
+    } 
+    if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
+      errorMsg = "Invalid clusterId in journal request - expected "
+          + journalInfo.getClusterId() + " actual " + registration.getClusterID();
+      LOG.warn(errorMsg);
+      throw new UnregisteredNodeException(journalInfo);
     }
   }
   
+  /** 
+   * Verifies a journal request
+   */
+  private void verify(long e, JournalInfo journalInfo) throws IOException {
+    verifyEpoch(e);
+    verify(journalInfo);
+  }
+  
   /**
    * Register this service with the active namenode.
    */
@@ -298,4 +357,9 @@ public class JournalService implements J
     listener.verifyVersion(this, nsInfo);
     registration.setStorageInfo(nsInfo);
   }
-}
\ No newline at end of file
+
+  @VisibleForTesting
+  long getEpoch() {
+    return epoch;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Sat Apr  7 00:52:12 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 
 /**
@@ -26,19 +27,20 @@ import org.apache.hadoop.hdfs.server.pro
  * to a BackupNode.
  */
 class BackupJournalManager implements JournalManager {
-
-  private final NamenodeRegistration nnReg;
   private final NamenodeRegistration bnReg;
+  private final JournalInfo journalInfo;
   
   BackupJournalManager(NamenodeRegistration bnReg,
       NamenodeRegistration nnReg) {
+    journalInfo = new JournalInfo(nnReg.getLayoutVersion(),
+        nnReg.getClusterID(), nnReg.getNamespaceID());
     this.bnReg = bnReg;
-    this.nnReg = nnReg;
   }
 
   @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
-    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
+        journalInfo);
     stm.startLogSegment(txId);
     return stm;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Apr  7 00:52:12 2012
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -217,7 +219,8 @@ public class BackupNode extends NameNode
   }
   
   /* @Override */// NameNode
-  public boolean setSafeMode(SafeModeAction action) throws IOException {
+  public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action)
+      throws IOException {
     throw new UnsupportedActionException("setSafeMode");
   }
   
@@ -236,51 +239,56 @@ public class BackupNode extends NameNode
     
     /** 
      * Verifies a journal request
-     * @param nodeReg node registration
-     * @throws UnregisteredNodeException if the registration is invalid
      */
-    void verifyJournalRequest(NamenodeRegistration reg) throws IOException {
-      verifyVersion(reg.getLayoutVersion());
+    private void verifyJournalRequest(JournalInfo journalInfo)
+        throws IOException {
+      verifyVersion(journalInfo.getLayoutVersion());
       String errorMsg = null;
       int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
-      if (reg.getNamespaceID() != expectedNamespaceID) {
+      if (journalInfo.getNamespaceId() != expectedNamespaceID) {
         errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
-            + " actual " + reg.getNamespaceID();
+            + " actual " + journalInfo.getNamespaceId();
         LOG.warn(errorMsg);
-        throw new UnregisteredNodeException(reg);
+        throw new UnregisteredNodeException(journalInfo);
       } 
-      if (!reg.getClusterID().equals(namesystem.getClusterId())) {
+      if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) {
         errorMsg = "Invalid clusterId in journal request - expected "
-            + reg.getClusterID() + " actual " + namesystem.getClusterId();
+            + journalInfo.getClusterId() + " actual " + namesystem.getClusterId();
         LOG.warn(errorMsg);
-        throw new UnregisteredNodeException(reg);
+        throw new UnregisteredNodeException(journalInfo);
       }
     }
 
-
     /////////////////////////////////////////////////////
     // BackupNodeProtocol implementation for backup node.
     /////////////////////////////////////////////////////
     @Override
-    public void startLogSegment(NamenodeRegistration registration, long txid)
-        throws IOException {
+    public void startLogSegment(JournalInfo journalInfo, long epoch,
+        long txid) throws IOException {
       namesystem.checkOperation(OperationCategory.JOURNAL);
-      verifyJournalRequest(registration);
+      verifyJournalRequest(journalInfo);
       getBNImage().namenodeStartedLogSegment(txid);
     }
     
     @Override
-    public void journal(NamenodeRegistration nnReg,
-        long firstTxId, int numTxns,
-        byte[] records) throws IOException {
+    public void journal(JournalInfo journalInfo, long epoch, long firstTxId,
+        int numTxns, byte[] records) throws IOException {
       namesystem.checkOperation(OperationCategory.JOURNAL);
-      verifyJournalRequest(nnReg);
+      verifyJournalRequest(journalInfo);
       getBNImage().journal(firstTxId, numTxns, records);
     }
 
     private BackupImage getBNImage() {
       return (BackupImage)nn.getFSImage();
     }
+
+    @Override
+    public FenceResponse fence(JournalInfo journalInfo, long epoch,
+        String fencerInfo) throws IOException {
+      LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+      throw new UnsupportedOperationException(
+          "BackupNode does not support fence");
+    }
   }
   
   //////////////////////////////////////////////////////

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Apr  7 00:52:12 2012
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -42,18 +43,18 @@ import org.apache.hadoop.security.UserGr
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private JournalProtocol backupNode;  // RPC proxy to backup node
-  private NamenodeRegistration bnRegistration;  // backup node registration
-  private NamenodeRegistration nnRegistration;  // active node registration
+  private final JournalProtocol backupNode;  // RPC proxy to backup node
+  private final NamenodeRegistration bnRegistration;  // backup node registration
+  private final JournalInfo journalInfo;  // active node registration
+  private final DataOutputBuffer out;     // serialized output sent to backup node
   private EditsDoubleBuffer doubleBuf;
-  private DataOutputBuffer out;     // serialized output sent to backup node
 
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
-                            NamenodeRegistration nnReg) // active name-node
+                            JournalInfo journalInfo) // active name-node
   throws IOException {
     super();
     this.bnRegistration = bnReg;
-    this.nnRegistration = nnReg;
+    this.journalInfo = journalInfo;
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
     try {
@@ -127,8 +128,7 @@ class EditLogBackupOutputStream extends 
       out.reset();
       assert out.getLength() == 0 : "Output buffer is not empty";
 
-      backupNode.journal(nnRegistration,
-          firstTxToFlush, numReadyTxns, data);
+      backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data);
     }
   }
 
@@ -140,6 +140,6 @@ class EditLogBackupOutputStream extends 
   }
 
   void startLogSegment(long txId) throws IOException {
-    backupNode.startLogSegment(nnRegistration, txId);
+    backupNode.startLogSegment(journalInfo, 0, txId);
   }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java?rev=1310649&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java Sat Apr  7 00:52:12 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Response to a journal fence request. See {@link JournalProtocol#fence}
+ */
+@InterfaceAudience.Private
+public class FenceResponse {
+  private final long previousEpoch;
+  private final long lastTransactionId;
+  private final boolean isInSync;
+  
+  public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) {
+    this.previousEpoch = previousEpoch;
+    this.lastTransactionId = lastTransId;
+    this.isInSync = inSync;
+  }
+
+  public boolean isInSync() {
+    return isInSync;
+  }
+
+  public long getLastTransactionId() {
+    return lastTransactionId;
+  }
+
+  public long getPreviousEpoch() {
+    return previousEpoch;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java?rev=1310649&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java Sat Apr  7 00:52:12 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+/**
+ * If a previous user of a resource tries to use a shared resource, after
+ * fenced by another user, this exception is thrown.
+ */
+public class FencedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  
+  public FencedException(String errorMsg) {
+    super(errorMsg);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java?rev=1310649&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java Sat Apr  7 00:52:12 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Information that describes a journal
+ */
+@InterfaceAudience.Private
+public class JournalInfo {
+  private final int layoutVersion;
+  private final String clusterId;
+  private final int namespaceId;
+
+  public JournalInfo(int lv, String clusterId, int nsId) {
+    this.layoutVersion = lv;
+    this.clusterId = clusterId;
+    this.namespaceId = nsId;
+  }
+
+  public int getLayoutVersion() {
+    return layoutVersion;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public int getNamespaceId() {
+    return namespaceId;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java Sat Apr  7 00:52:12 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
 
 /**
@@ -53,12 +52,15 @@ public interface JournalProtocol {
    * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
    * changes with the backup namespace image.
    * 
-   * @param registration active node registration
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param firstTxnId the first transaction of this batch
    * @param numTxns number of transactions
    * @param records byte array containing serialized journal records
+   * @throws FencedException if the resource has been fenced
    */
-  public void journal(NamenodeRegistration registration,
+  public void journal(JournalInfo journalInfo,
+                      long epoch,
                       long firstTxnId,
                       int numTxns,
                       byte[] records) throws IOException;
@@ -66,9 +68,24 @@ public interface JournalProtocol {
   /**
    * Notify the BackupNode that the NameNode has rolled its edit logs
    * and is now writing a new log segment.
-   * @param registration the registration of the active NameNode
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param txid the first txid in the new log
+   * @throws FencedException if the resource has been fenced
    */
-  public void startLogSegment(NamenodeRegistration registration,
+  public void startLogSegment(JournalInfo journalInfo, long epoch,
       long txid) throws IOException;
+  
+  /**
+   * Request to fence any other journal writers.
+   * Older writers with at previous epoch will be fenced and can no longer
+   * perform journal operations.
+   * 
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
+   * @param fencerInfo info about fencer for debugging purposes
+   * @throws FencedException if the resource has been fenced
+   */
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException;
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto Sat Apr  7 00:52:12 2012
@@ -36,16 +36,18 @@ message JournalInfoProto {
 }
 
 /**
- * JournalInfo - the information about the journal
+ * journalInfo - the information about the journal
  * firstTxnId - the first txid in the journal records
  * numTxns - Number of transactions in editlog
  * records - bytes containing serialized journal records
+ * epoch - change to this represents change of journal writer
  */
 message JournalRequestProto {
   required JournalInfoProto journalInfo = 1;
   required uint64 firstTxnId = 2;
   required uint32 numTxns = 3;
   required bytes records = 4;
+  required uint64 epoch = 5;
 }
 
 /**
@@ -55,12 +57,13 @@ message JournalResponseProto { 
 }
 
 /**
- * JournalInfo - the information about the journal
+ * journalInfo - the information about the journal
  * txid - first txid in the new log
  */
 message StartLogSegmentRequestProto {
-  required JournalInfoProto journalInfo = 1;
-  required uint64 txid = 2;
+  required JournalInfoProto journalInfo = 1; // Info about the journal
+  required uint64 txid = 2; // Transaction ID
+  required uint64 epoch = 3;
 }
 
 /**
@@ -70,6 +73,27 @@ message StartLogSegmentResponseProto { 
 }
 
 /**
+ * journalInfo - the information about the journal
+ * txid - first txid in the new log
+ */
+message FenceRequestProto {
+  required JournalInfoProto journalInfo = 1; // Info about the journal
+  required uint64 epoch = 2; // Epoch - change indicates change in writer
+  optional string fencerInfo = 3; // Info about fencer for debugging
+}
+
+/**
+ * previousEpoch - previous epoch if any or zero
+ * lastTransactionId - last valid transaction Id in the journal
+ * inSync - if all journal segments are available and in sync
+ */
+message FenceResponseProto {
+  optional uint64 previousEpoch = 1;
+  optional uint64 lastTransactionId = 2;
+  optional bool inSync = 3;
+}
+
+/**
  * Protocol used to journal edits to a remote node. Currently,
  * this is used to publish edits from the NameNode to a BackupNode.
  *
@@ -89,4 +113,10 @@ service JournalProtocolService {
    */
   rpc startLogSegment(StartLogSegmentRequestProto) 
       returns (StartLogSegmentResponseProto);
+  
+  /**
+   * Request to fence a journal receiver.
+   */
+  rpc fence(FenceRequestProto)
+      returns (FenceResponseProto);
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1310649&r1=1310648&r2=1310649&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java Sat Apr  7 00:52:12 2012
@@ -20,12 +20,18 @@ package org.apache.hadoop.hdfs.server.jo
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.FencedException;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -42,7 +48,7 @@ public class TestJournalService {
    * called.
    */
   @Test
-  public void testCallBacks() throws IOException {
+  public void testCallBacks() throws Exception {
     JournalListener listener = Mockito.mock(JournalListener.class);
     JournalService service = null;
     try {
@@ -51,6 +57,7 @@ public class TestJournalService {
       service = startJournalService(listener);
       verifyRollLogsCallback(service, listener);
       verifyJournalCallback(service, listener);
+      verifyFence(service, cluster.getNameNode(0));
     } finally {
       if (service != null) {
         service.stop();
@@ -93,4 +100,28 @@ public class TestJournalService {
     Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
         Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
   }
+  
+  public void verifyFence(JournalService s, NameNode nn) throws Exception {
+    String cid = nn.getNamesystem().getClusterId();
+    int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
+    int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+    
+    // Fence the journal service
+    JournalInfo info = new JournalInfo(lv, cid, nsId);
+    long currentEpoch = s.getEpoch();
+    
+    // New epoch lower than the current epoch is rejected
+    try {
+      s.fence(info, (currentEpoch - 1), "fencer");
+    } catch (FencedException ignore) { /* Ignored */ } 
+    
+    // New epoch equal to the current epoch is rejected
+    try {
+      s.fence(info, currentEpoch, "fencer");
+    } catch (FencedException ignore) { /* Ignored */ } 
+    
+    // New epoch higher than the current epoch is successful
+    FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
+    Assert.assertNotNull(resp);
+  }
 }
\ No newline at end of file