You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/06 00:35:53 UTC
svn commit: r1310115 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/journalservice/
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/test/java/org/apache/hadoop/hdfs/server/journalserv...
Author: szetszwo
Date: Thu Apr 5 22:35:53 2012
New Revision: 1310115
URL: http://svn.apache.org/viewvc?rev=1310115&view=rev
Log:
HDFS-3178. Add states and state handler for journal synchronization in JournalService.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
- copied, changed from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
- copied, changed from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java
Removed:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1310115&r1=1310114&r2=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Apr 5 22:35:53 2012
@@ -59,6 +59,9 @@ Trunk (unreleased changes)
HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
+ HDFS-3178. Add states and state handler for journal synchronization in
+ JournalService. (szetszwo)
+
OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java?rev=1310115&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java Thu Apr 5 22:35:53 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/**
+ * JournalListener is a callback interface to handle journal records
+ * received from the namenode.
+ */
+public interface JournalListener {
+ /**
+ * Check the namespace information returned by a namenode
+ * @param service service that is making the callback
+ * @param info returned namespace information from the namenode
+ *
+ * The application using {@link JournalService} can stop the service if
+ * {@code info} validation fails.
+ */
+ public void verifyVersion(JournalService service, NamespaceInfo info);
+
+ /**
+ * Process the received Journal record
+ * @param service {@link JournalService} making the callback
+ * @param firstTxnId first transaction Id in the journal
+ * @param numTxns number of records
+ * @param records journal records
+ * @throws IOException on error
+ *
+ * Any IOException thrown from the listener is thrown back in
+ * {@link JournalProtocol#journal}
+ */
+ public void journal(JournalService service, long firstTxnId, int numTxns,
+ byte[] records) throws IOException;
+
+ /**
+ * Roll the editlog
+ * @param service {@link JournalService} making the callback
+ * @param txid transaction ID to roll at
+ *
+ * Any IOException thrown from the listener is thrown back in
+ * {@link JournalProtocol#startLogSegment}
+ */
+ public void rollLogs(JournalService service, long txid) throws IOException;
+}
\ No newline at end of file
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java&r1=1310111&r2=1310115&rev=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Thu Apr 5 22:35:53 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -23,15 +23,14 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -60,87 +59,90 @@ import com.google.protobuf.BlockingServi
*/
public class JournalService implements JournalProtocol {
public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
+
private final JournalListener listener;
- private final boolean internalRpcServer;
private final InetSocketAddress nnAddress;
private final NamenodeRegistration registration;
private final NamenodeProtocol namenode;
- private volatile State state = State.INIT;
- private RPC.Server rpcServer;
+ private final StateHandler stateHandler = new StateHandler();
+ private final RPC.Server rpcServer;
enum State {
- INIT,
- STARTING_UP,
- RUNNING,
- STOPPED;
- }
-
- /**
- * JournalListener is a callback interface to handle journal records
- * received from the namenode.
- */
- public interface JournalListener {
+ /** The service is initialized and ready to start. */
+ INIT(false, false),
/**
- * Check the namespace information returned by a namenode
- * @param service service that is making the callback
- * @param info returned namespace information from the namenode
- *
- * The application using {@link JournalService} can stop the service if
- * {@code info} validation fails.
+ * RPC server is started.
+ * The service is ready to receive requests from namenode.
*/
- public void verifyVersion(JournalService service, NamespaceInfo info);
-
+ STARTED(false, false),
+ /** The service is fenced by a namenode and waiting for roll. */
+ WAITING_FOR_ROLL(false, true),
/**
- * Process the received Journal record
- * @param service {@link JournalService} making the callback
- * @param firstTxnId first transaction Id in the journal
- * @param numTxns number of records
- * @param records journal records
- * @throws IOException on error
- *
- * Any IOException thrown from the listener is thrown back in
- * {@link JournalProtocol#journal}
+ * The existing log is syncing with another source
+ * and it accepts journal from Namenode.
*/
- public void journal(JournalService service, long firstTxnId, int numTxns,
- byte[] records) throws IOException;
+ SYNCING(true, true),
+ /** The existing log is in sync and it accepts journal from Namenode. */
+ IN_SYNC(true, true),
+ /** The service is stopped. */
+ STOPPED(false, false);
+
+ final boolean isJournalAllowed;
+ final boolean isStartLogSegmentAllowed;
- /**
- * Roll the editlog
- * @param service {@link JournalService} making the callback
- * @param txid transaction ID to roll at
- *
- * Any IOException thrown from the listener is thrown back in
- * {@link JournalProtocol#startLogSegment}
- */
- public void rollLogs(JournalService service, long txid) throws IOException;
+ State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) {
+ this.isJournalAllowed = isJournalAllowed;
+ this.isStartLogSegmentAllowed = isStartLogSegmentAllowed;
+ }
}
- /**
- * Constructor to create {@link JournalService} based on an existing RPC server.
- * After creating the service, the caller needs to start the RPC server.
- *
- * @param conf Configuration
- * @param nnAddr host:port for the active Namenode's RPC server
- * @param listener call-back interface to listen to journal activities
- * @param rpcServer RPC server if the application has already one, which can be
- * reused. If this is null, then the RPC server is started by
- * {@link JournalService}
- * @param reg namenode registration information if there is one already, say
- * if you are using this service in namenode. If it is null, then the
- * service creates a new registration.
- * @throws IOException on error
- */
- JournalService(Configuration conf, InetSocketAddress nnAddr,
- JournalListener listener, RPC.Server rpcServer, NamenodeRegistration reg)
- throws IOException {
- this.nnAddress = nnAddr;
- this.listener = listener;
- this.registration = reg;
- this.internalRpcServer = false;
- this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
- NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
- .getProxy();
- initRpcServer(conf, rpcServer);
+ static class StateHandler {
+ State current = State.INIT;
+
+ synchronized void start() {
+ if (current != State.INIT) {
+ throw new IllegalStateException("Service cannot be started in "
+ + current + " state.");
+ }
+ current = State.STARTED;
+ }
+
+ synchronized void waitForRoll() {
+ if (current != State.STARTED) {
+ throw new IllegalStateException("Cannot wait-for-roll in " + current
+ + " state.");
+ }
+ current = State.WAITING_FOR_ROLL;
+ }
+
+ synchronized void startLogSegment() throws IOException {
+ if (current == State.WAITING_FOR_ROLL) {
+ current = State.SYNCING;
+ }
+ }
+
+ synchronized void isStartLogSegmentAllowed() throws IOException {
+ if (!current.isStartLogSegmentAllowed) {
+ throw new IOException("Cannot start log segment in " + current
+ + " state.");
+ }
+ }
+
+ synchronized void isJournalAllowed() throws IOException {
+ if (!current.isJournalAllowed) {
+ throw new IOException("Cannot journal in " + current + " state.");
+ }
+ }
+
+ synchronized boolean isStopped() {
+ if (current == State.STOPPED) {
+ LOG.warn("Ignore stop request since the service is in " + current
+ + " state.");
+ return true;
+ }
+ current = State.STOPPED;
+ return false;
+ }
}
/**
@@ -160,11 +162,11 @@ public class JournalService implements J
throws IOException {
this.nnAddress = nnAddr;
this.listener = listener;
- this.internalRpcServer = true;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
.getProxy();
- initRpcServer(conf, serverAddress);
+ this.rpcServer = createRpcServer(conf, serverAddress, this);
+
String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
StorageInfo storage = new StorageInfo(
LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
@@ -176,23 +178,13 @@ public class JournalService implements J
* Start the service.
*/
public void start() {
- synchronized(this) {
- if (state != State.INIT) {
- LOG.info("Service cannot be started in state - " + state);
- return;
- }
- state = State.STARTING_UP;
- }
+ stateHandler.start();
+
// Start the RPC server
- if (internalRpcServer) {
- LOG.info("Starting rpc server");
- rpcServer.start();
- }
-
- boolean registered = false;
- boolean handshakeComplete = false;
- boolean rollEdits = false;
- while (state == State.STARTING_UP) {
+ LOG.info("Starting rpc server");
+ rpcServer.start();
+
+ for(boolean registered = false, handshakeComplete = false; ; ) {
try {
// Perform handshake
if (!handshakeComplete) {
@@ -206,12 +198,6 @@ public class JournalService implements J
registerWithNamenode();
registered = true;
LOG.info("Registration completed");
- }
-
- if (!rollEdits) {
- namenode.rollEditLog();
- rollEdits = true;
- LOG.info("Editlog roll completed");
break;
}
} catch (IOException ioe) {
@@ -226,10 +212,13 @@ public class JournalService implements J
LOG.warn("Encountered exception ", ie);
}
}
- synchronized(this) {
- state = State.RUNNING;
+
+ stateHandler.waitForRoll();
+ try {
+ namenode.rollEditLog();
+ } catch (IOException e) {
+ LOG.warn("Encountered exception ", e);
}
-
}
/**
@@ -237,15 +226,8 @@ public class JournalService implements J
* RPC Server must be stopped the application.
*/
public void stop() {
- synchronized (this) {
- if (state == State.STOPPED) {
- return;
- }
- state = State.STOPPED;
- }
- if (internalRpcServer && rpcServer != null) {
+ if (!stateHandler.isStopped()) {
rpcServer.stop();
- rpcServer = null;
}
}
@@ -255,6 +237,7 @@ public class JournalService implements J
if (LOG.isTraceEnabled()) {
LOG.trace("Received journal " + firstTxnId + " " + numTxns);
}
+ stateHandler.isJournalAllowed();
verify(registration);
listener.journal(this, firstTxnId, numTxns, records);
}
@@ -265,37 +248,23 @@ public class JournalService implements J
if (LOG.isTraceEnabled()) {
LOG.trace("Received startLogSegment " + txid);
}
+ stateHandler.isStartLogSegmentAllowed();
verify(registration);
listener.rollLogs(this, txid);
+ stateHandler.startLogSegment();
}
- /**
- * Stand alone mode where RPC Server is created and managed by this service
- */
- private void initRpcServer(Configuration conf, InetSocketAddress serverAddress)
- throws IOException {
+ /** Create an RPC server. */
+ private static RPC.Server createRpcServer(Configuration conf,
+ InetSocketAddress address, JournalProtocol impl) throws IOException {
RPC.setProtocolEngine(conf, JournalProtocolPB.class,
ProtobufRpcEngine.class);
JournalProtocolServerSideTranslatorPB xlator =
- new JournalProtocolServerSideTranslatorPB(this);
- BlockingService service =
- JournalProtocolService.newReflectiveBlockingService(xlator);
- rpcServer = RPC.getServer(JournalProtocolPB.class, service,
- serverAddress.getHostName(), serverAddress.getPort(), 1, false, conf,
- null);
- }
-
- /**
- * RPC Server is created and managed by the application - used by this service
- */
- private void initRpcServer(Configuration conf, RPC.Server server)
- throws IOException {
- rpcServer = server;
- JournalProtocolServerSideTranslatorPB xlator =
- new JournalProtocolServerSideTranslatorPB(this);
+ new JournalProtocolServerSideTranslatorPB(impl);
BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator);
- DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, rpcServer);
+ return RPC.getServer(JournalProtocolPB.class, service,
+ address.getHostName(), address.getPort(), 1, false, conf, null);
}
private void verify(NamenodeRegistration reg) throws IOException {
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java (from r1310111, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java&r1=1310111&r2=1310115&rev=1310115&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java Thu Apr 5 22:35:53 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystemTe
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.JournalService.JournalListener;
import org.junit.Test;
import org.mockito.Mockito;