You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/06 00:35:53 UTC

svn commit: r1310115 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/journalservice/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/journalserv...

Author: szetszwo
Date: Thu Apr  5 22:35:53 2012
New Revision: 1310115

URL: http://svn.apache.org/viewvc?rev=1310115&view=rev
Log:
HDFS-3178. Add states and state handler for journal synchronization in JournalService.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
      - copied, changed from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
      - copied, changed from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1310115&r1=1310114&r2=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Apr  5 22:35:53 2012
@@ -59,6 +59,9 @@ Trunk (unreleased changes)
 
     HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
 
+    HDFS-3178. Add states and state handler for journal synchronization in
+    JournalService.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java?rev=1310115&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java Thu Apr  5 22:35:53 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/**
+ * JournalListener is a callback interface to handle journal records
+ * received from the namenode.
+ */
+public interface JournalListener {
+  /**
+   * Check the namespace information returned by a namenode
+   * @param service service that is making the callback
+   * @param info returned namespace information from the namenode
+   * 
+   * The application using {@link JournalService} can stop the service if
+   * {@code info} validation fails.
+   */
+  public void verifyVersion(JournalService service, NamespaceInfo info);
+  
+  /**
+   * Process the received Journal record
+   * @param service {@link JournalService} making the callback
+   * @param firstTxnId first transaction Id in the journal
+   * @param numTxns number of records
+   * @param records journal records
+   * @throws IOException on error
+   * 
+   * Any IOException thrown from the listener is thrown back in 
+   * {@link JournalProtocol#journal}
+   */
+  public void journal(JournalService service, long firstTxnId, int numTxns,
+      byte[] records) throws IOException;
+  
+  /**
+   * Roll the editlog
+   * @param service {@link JournalService} making the callback
+   * @param txid transaction ID to roll at
+   * 
+   * Any IOException thrown from the listener is thrown back in 
+   * {@link JournalProtocol#startLogSegment}
+   */
+  public void rollLogs(JournalService service, long txid) throws IOException;
+}
\ No newline at end of file

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java&r1=1310111&r2=1310115&rev=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Thu Apr  5 22:35:53 2012
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.journalservice;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -23,15 +23,14 @@ import java.net.InetSocketAddress;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -60,87 +59,90 @@ import com.google.protobuf.BlockingServi
  */
 public class JournalService implements JournalProtocol {
   public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
+
   private final JournalListener listener;
-  private final boolean internalRpcServer;
   private final InetSocketAddress nnAddress;
   private final NamenodeRegistration registration;
   private final NamenodeProtocol namenode;
-  private volatile State state = State.INIT;
-  private RPC.Server rpcServer;
+  private final StateHandler stateHandler = new StateHandler();
+  private final RPC.Server rpcServer;
   
   enum State {
-    INIT,
-    STARTING_UP,
-    RUNNING,
-    STOPPED;
-  }
-  
-  /**
-   * JournalListener is a callback interface to handle journal records
-   * received from the namenode.
-   */
-  public interface JournalListener {
+    /** The service is initialized and ready to start. */
+    INIT(false, false),
     /**
-     * Check the namespace information returned by a namenode
-     * @param service service that is making the callback
-     * @param info returned namespace information from the namenode
-     * 
-     * The application using {@link JournalService} can stop the service if
-     * {@code info} validation fails.
+     * RPC server is started.
+     * The service is ready to receive requests from namenode.
      */
-    public void verifyVersion(JournalService service, NamespaceInfo info);
-    
+    STARTED(false, false),
+    /** The service is fenced by a namenode and waiting for roll. */
+    WAITING_FOR_ROLL(false, true),
     /**
-     * Process the received Journal record
-     * @param service {@link JournalService} making the callback
-     * @param firstTxnId first transaction Id in the journal
-     * @param numTxns number of records
-     * @param records journal records
-     * @throws IOException on error
-     * 
-     * Any IOException thrown from the listener is thrown back in 
-     * {@link JournalProtocol#journal}
+     * The existing log is syncing with another source
+     * and it accepts journal from Namenode.
      */
-    public void journal(JournalService service, long firstTxnId, int numTxns,
-        byte[] records) throws IOException;
+    SYNCING(true, true),
+    /** The existing log is in sync and it accepts journal from Namenode. */
+    IN_SYNC(true, true),
+    /** The service is stopped. */
+    STOPPED(false, false);
+
+    final boolean isJournalAllowed;
+    final boolean isStartLogSegmentAllowed;
     
-    /**
-     * Roll the editlog
-     * @param service {@link JournalService} making the callback
-     * @param txid transaction ID to roll at
-     * 
-     * Any IOException thrown from the listener is thrown back in 
-     * {@link JournalProtocol#startLogSegment}
-     */
-    public void rollLogs(JournalService service, long txid) throws IOException;
+    State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) {
+      this.isJournalAllowed = isJournalAllowed;
+      this.isStartLogSegmentAllowed = isStartLogSegmentAllowed;
+    }
   }
   
-  /**
-   * Constructor to create {@link JournalService} based on an existing RPC server.
-   * After creating the service, the caller needs to start the RPC server.
-   * 
-   * @param conf Configuration
-   * @param nnAddr host:port for the active Namenode's RPC server
-   * @param listener call-back interface to listen to journal activities
-   * @param rpcServer RPC server if the application has already one, which can be
-   *          reused. If this is null, then the RPC server is started by
-   *          {@link JournalService}
-   * @param reg namenode registration information if there is one already, say
-   *          if you are using this service in namenode. If it is null, then the
-   *          service creates a new registration.
-   * @throws IOException on error
-   */
-  JournalService(Configuration conf, InetSocketAddress nnAddr,
-      JournalListener listener, RPC.Server rpcServer, NamenodeRegistration reg)
-      throws IOException {
-    this.nnAddress = nnAddr;
-    this.listener = listener;
-    this.registration = reg;
-    this.internalRpcServer = false;
-    this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
-        NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
-        .getProxy();
-    initRpcServer(conf, rpcServer);
+  static class StateHandler {
+    State current = State.INIT;
+    
+    synchronized void start() {
+      if (current != State.INIT) {
+        throw new IllegalStateException("Service cannot be started in "
+            + current + " state.");
+      }
+      current = State.STARTED;
+    }
+
+    synchronized void waitForRoll() {
+      if (current != State.STARTED) {
+        throw new IllegalStateException("Cannot wait-for-roll in " + current
+            + " state.");
+      }
+      current = State.WAITING_FOR_ROLL;
+    }
+
+    synchronized void startLogSegment() throws IOException {
+      if (current == State.WAITING_FOR_ROLL) {
+        current = State.SYNCING;
+      }
+    }
+
+    synchronized void isStartLogSegmentAllowed() throws IOException {
+      if (!current.isStartLogSegmentAllowed) {
+        throw new IOException("Cannot start log segment in " + current
+            + " state.");
+      }
+    }
+
+    synchronized void isJournalAllowed() throws IOException {
+      if (!current.isJournalAllowed) {
+        throw new IOException("Cannot journal in " + current + " state.");
+      }
+    }
+
+    synchronized boolean isStopped() {
+      if (current == State.STOPPED) {
+        LOG.warn("Ignore stop request since the service is in " + current
+            + " state.");
+        return true;
+      }
+      current = State.STOPPED;
+      return false;
+    }
   }
   
   /**
@@ -160,11 +162,11 @@ public class JournalService implements J
       throws IOException {
     this.nnAddress = nnAddr;
     this.listener = listener;
-    this.internalRpcServer = true;
     this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
         NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
         .getProxy();
-    initRpcServer(conf, serverAddress);
+    this.rpcServer = createRpcServer(conf, serverAddress, this);
+
     String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
     StorageInfo storage = new StorageInfo(
         LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
@@ -176,23 +178,13 @@ public class JournalService implements J
    * Start the service.
    */
   public void start() {
-    synchronized(this) {
-      if (state != State.INIT) {
-        LOG.info("Service cannot be started in state - " + state);
-        return;
-      }
-      state = State.STARTING_UP;
-    }
+    stateHandler.start();
+
     // Start the RPC server
-    if (internalRpcServer) {
-      LOG.info("Starting rpc server");
-      rpcServer.start();
-    }
-    
-    boolean registered = false;
-    boolean handshakeComplete = false;
-    boolean rollEdits = false;
-    while (state == State.STARTING_UP) {
+    LOG.info("Starting rpc server");
+    rpcServer.start();
+
+    for(boolean registered = false, handshakeComplete = false; ; ) {
       try {
         // Perform handshake
         if (!handshakeComplete) {
@@ -206,12 +198,6 @@ public class JournalService implements J
           registerWithNamenode();
           registered = true;
           LOG.info("Registration completed");
-        }
-        
-        if (!rollEdits) {
-          namenode.rollEditLog();
-          rollEdits = true;
-          LOG.info("Editlog roll completed");
           break;
         }
       } catch (IOException ioe) {
@@ -226,10 +212,13 @@ public class JournalService implements J
         LOG.warn("Encountered exception ", ie);
       }
     }
-    synchronized(this) {
-      state = State.RUNNING;
+
+    stateHandler.waitForRoll();
+    try {
+      namenode.rollEditLog();
+    } catch (IOException e) {
+      LOG.warn("Encountered exception ", e);
     }
-    
   }
 
   /**
@@ -237,15 +226,8 @@ public class JournalService implements J
    * RPC Server must be stopped the application.
    */
   public void stop() {
-    synchronized (this) {
-      if (state == State.STOPPED) {
-        return;
-      }
-      state = State.STOPPED;
-    }
-    if (internalRpcServer && rpcServer != null) {
+    if (!stateHandler.isStopped()) {
       rpcServer.stop();
-      rpcServer = null;
     }
   }
 
@@ -255,6 +237,7 @@ public class JournalService implements J
     if (LOG.isTraceEnabled()) {
       LOG.trace("Received journal " + firstTxnId + " " + numTxns);
     }
+    stateHandler.isJournalAllowed();
     verify(registration);
     listener.journal(this, firstTxnId, numTxns, records);
   }
@@ -265,37 +248,23 @@ public class JournalService implements J
     if (LOG.isTraceEnabled()) {
       LOG.trace("Received startLogSegment " + txid);
     }
+    stateHandler.isStartLogSegmentAllowed();
     verify(registration);
     listener.rollLogs(this, txid);
+    stateHandler.startLogSegment();
   }
 
-  /** 
-   * Stand alone mode where RPC Server is created and managed by this service 
-   */
-  private void initRpcServer(Configuration conf, InetSocketAddress serverAddress)
-      throws IOException {
+  /** Create an RPC server. */
+  private static RPC.Server createRpcServer(Configuration conf,
+      InetSocketAddress address, JournalProtocol impl) throws IOException {
     RPC.setProtocolEngine(conf, JournalProtocolPB.class,
         ProtobufRpcEngine.class);
     JournalProtocolServerSideTranslatorPB xlator = 
-        new JournalProtocolServerSideTranslatorPB(this);
-    BlockingService service = 
-        JournalProtocolService.newReflectiveBlockingService(xlator);
-    rpcServer = RPC.getServer(JournalProtocolPB.class, service,
-        serverAddress.getHostName(), serverAddress.getPort(), 1, false, conf,
-        null);
-  }
-
-  /**
-   * RPC Server is created and managed by the application - used by this service
-   */
-  private void initRpcServer(Configuration conf, RPC.Server server)
-      throws IOException {
-    rpcServer = server;
-    JournalProtocolServerSideTranslatorPB xlator = 
-        new JournalProtocolServerSideTranslatorPB(this);
+        new JournalProtocolServerSideTranslatorPB(impl);
     BlockingService service = 
         JournalProtocolService.newReflectiveBlockingService(xlator);
-    DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, rpcServer);
+    return RPC.getServer(JournalProtocolPB.class, service,
+        address.getHostName(), address.getPort(), 1, false, conf, null);
   }
   
   private void verify(NamenodeRegistration reg) throws IOException {

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java (from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java&r1=1310111&r2=1310115&rev=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java Thu Apr  5 22:35:53 2012
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.journalservice;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystemTe
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.JournalService.JournalListener;
 import org.junit.Test;
 import org.mockito.Mockito;