You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/10/03 19:24:07 UTC
[hadoop] branch branch-2 updated: HDFS-12979. StandbyNode should
upload FsImage to ObserverNode after checkpointing. Contributed by Chen
Liang.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 72b37e8 HDFS-12979. StandbyNode should upload FsImage to ObserverNode after checkpointing. Contributed by Chen Liang.
72b37e8 is described below
commit 72b37e8833e22ee72dc441b1fac49067faf7e267
Author: Chen Liang <cl...@apache.org>
AuthorDate: Thu Oct 3 12:23:25 2019 -0700
HDFS-12979. StandbyNode should upload FsImage to ObserverNode after checkpointing. Contributed by Chen Liang.
---
.../hadoop/hdfs/server/namenode/ImageServlet.java | 63 ++++++++-
.../hadoop/hdfs/server/namenode/NameNode.java | 8 +-
.../server/namenode/ha/StandbyCheckpointer.java | 141 +++++++++++++--------
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 8 ++
.../hdfs/server/namenode/TestCheckpoint.java | 55 ++++++++
.../server/namenode/ha/TestStandbyCheckpoints.java | 37 +++++-
6 files changed, 258 insertions(+), 54 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index fdd8d70..ad8b159 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hdfs.server.common.Util;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.net.HttpURLConnection;
@@ -88,6 +94,10 @@ public class ImageServlet extends HttpServlet {
private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
.<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
+ public static final String RECENT_IMAGE_CHECK_ENABLED =
+ "recent.image.check.enabled";
+ public static final boolean RECENT_IMAGE_CHECK_ENABLED_DEFAULT = true;
+
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
@@ -481,6 +491,23 @@ public class ImageServlet extends HttpServlet {
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+ final boolean checkRecentImageEnable;
+ Object checkRecentImageEnableObj =
+ context.getAttribute(RECENT_IMAGE_CHECK_ENABLED);
+ if (checkRecentImageEnableObj != null) {
+ if (checkRecentImageEnableObj instanceof Boolean) {
+ checkRecentImageEnable = (boolean) checkRecentImageEnableObj;
+ } else {
+ // This is an error case, but crashing NN due to this
+ // seems more undesirable. Only log the error and set to default.
+ LOG.error("Expecting boolean obj for setting checking recent image, "
+ + "but got " + checkRecentImageEnableObj.getClass() + ". This is "
+ + "unexpected! Setting to default.");
+ checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
+ }
+ } else {
+ checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
+ }
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
@@ -494,7 +521,8 @@ public class ImageServlet extends HttpServlet {
// target (regardless of the fact that we got the image)
HAServiceProtocol.HAServiceState state = NameNodeHttpServer
.getNameNodeStateFromContext(getServletContext());
- if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
+ if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
+ state != HAServiceProtocol.HAServiceState.OBSERVER) {
// we need a different response type here so the client can differentiate this
// from the failure to upload due to (1) security, or (2) other checkpoints already
// present
@@ -528,6 +556,39 @@ public class ImageServlet extends HttpServlet {
+ txid);
return null;
}
+
+ long now = System.currentTimeMillis();
+ long lastCheckpointTime =
+ nnImage.getStorage().getMostRecentCheckpointTime();
+ long lastCheckpointTxid =
+ nnImage.getStorage().getMostRecentCheckpointTxId();
+
+ long checkpointPeriod =
+ conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
+ DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
+ long checkpointTxnCount =
+ conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
+ DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+
+ long timeDelta = TimeUnit.MILLISECONDS.toSeconds(
+ now - lastCheckpointTime);
+
+ if (checkRecentImageEnable &&
+ timeDelta < checkpointPeriod &&
+ txid - lastCheckpointTxid < checkpointTxnCount) {
+ // only when at least one of two conditions are met we accept
+ // a new fsImage
+ // 1. most recent image's txid is too far behind
+ // 2. last checkpoint time was too old
+ response.sendError(HttpServletResponse.SC_CONFLICT,
+ "Most recent checkpoint is neither too far behind in "
+ + "txid, nor too old. New txnid cnt is "
+ + (txid - lastCheckpointTxid)
+ + ", expecting at least " + checkpointTxnCount
+ + " unless too long since last upload.");
+ return null;
+ }
+
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index b44d104..ccc0af6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException;
@@ -506,7 +507,12 @@ public class NameNode extends ReconfigurableBase implements
LOG.info("Setting ADDRESS {}", address);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
}
-
+
+ @VisibleForTesting
+ public HttpServer2 getHttpServer() {
+ return httpServer.getHttpServer();
+ }
+
/**
* Fetches the address for services to use when connecting to namenode
* based on the value of fallback returns null if the special
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index 753447b..c05a0da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.util.Time.monotonicNow;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,7 +68,6 @@ public class StandbyCheckpointer {
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
- private long lastUploadTime;
private final CheckpointerThread thread;
private final ThreadFactory uploadThreadFactory;
private List<URL> activeNNAddresses;
@@ -73,12 +75,14 @@ public class StandbyCheckpointer {
private final Object cancelLock = new Object();
private Canceler canceler;
- private boolean isPrimaryCheckPointer = true;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
private static int canceledCount = 0;
-
+
+ // A map from NN url to the most recent image upload time.
+ private final HashMap<String, CheckpointReceiverEntry> checkpointReceivers;
+
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
@@ -89,6 +93,37 @@ public class StandbyCheckpointer {
.setNameFormat("TransferFsImageUpload-%d").build();
setNameNodeAddresses(conf);
+ this.checkpointReceivers = new HashMap<>();
+ for (URL address : activeNNAddresses) {
+ this.checkpointReceivers.put(address.toString(),
+ new CheckpointReceiverEntry());
+ }
+ }
+
+ private static final class CheckpointReceiverEntry {
+ private long lastUploadTime;
+ private boolean isPrimary;
+
+ CheckpointReceiverEntry() {
+ this.lastUploadTime = 0L;
+ this.isPrimary = true;
+ }
+
+ void setLastUploadTime(long lastUploadTime) {
+ this.lastUploadTime = lastUploadTime;
+ }
+
+ void setIsPrimary(boolean isPrimaryFor) {
+ this.isPrimary = isPrimaryFor;
+ }
+
+ long getLastUploadTime() {
+ return lastUploadTime;
+ }
+
+ boolean isPrimary() {
+ return isPrimary;
+ }
}
/**
@@ -156,7 +191,7 @@ public class StandbyCheckpointer {
thread.interrupt();
}
- private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
+ private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
final long txid;
final NameNodeFile imageType;
@@ -208,11 +243,6 @@ public class StandbyCheckpointer {
namesystem.cpUnlock();
}
- //early exit if we shouldn't actually send the checkpoint to the ANN
- if(!sendCheckpoint){
- return;
- }
-
// Upload the saved checkpoint back to the active
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up
@@ -222,54 +252,67 @@ public class StandbyCheckpointer {
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to
// directly tie them together by adding a pair class.
- List<Future<TransferFsImage.TransferResult>> uploads =
- new ArrayList<Future<TransferFsImage.TransferResult>>();
+ HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
+ new HashMap<>();
for (final URL activeNNAddress : activeNNAddresses) {
- Future<TransferFsImage.TransferResult> upload =
- executor.submit(new Callable<TransferFsImage.TransferResult>() {
- @Override
- public TransferFsImage.TransferResult call() throws IOException {
- return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
- .getFSImage().getStorage(), imageType, txid, canceler);
- }
- });
- uploads.add(upload);
+ // Upload image if at least 1 of 2 following conditions met:
+ // 1. has been quiet for long enough, try to contact the node.
+ // 2. this standby IS the primary checkpointer of target NN.
+ String addressString = activeNNAddress.toString();
+ assert checkpointReceivers.containsKey(addressString);
+ CheckpointReceiverEntry receiverEntry =
+ checkpointReceivers.get(addressString);
+ long secsSinceLastUpload =
+ TimeUnit.MILLISECONDS.toSeconds(
+ monotonicNow() - receiverEntry.getLastUploadTime());
+ boolean shouldUpload = receiverEntry.isPrimary() ||
+ secsSinceLastUpload >= checkpointConf.getQuietPeriod();
+ if (shouldUpload) {
+ Future<TransferFsImage.TransferResult> upload =
+ executor.submit(new Callable<TransferFsImage.TransferResult>() {
+ @Override
+ public TransferFsImage.TransferResult call() throws IOException {
+ return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
+ .getFSImage().getStorage(), imageType, txid, canceler);
+ }
+ });
+ uploads.put(addressString, upload);
+ }
}
InterruptedException ie = null;
- IOException ioe= null;
- int i = 0;
- boolean success = false;
- for (; i < uploads.size(); i++) {
- Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+ List<IOException> ioes = Lists.newArrayList();
+ for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
+ uploads.entrySet()) {
+ String url = entry.getKey();
+ Future<TransferFsImage.TransferResult> upload = entry.getValue();
try {
- // TODO should there be some smarts here about retries nodes that are not the active NN?
+ // TODO should there be some smarts here about retries nodes that
+ // are not the active NN?
+ CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
- success = true;
- //avoid getting the rest of the results - we don't care since we had a successful upload
- break;
+ receiverEntry.setLastUploadTime(monotonicNow());
+ receiverEntry.setIsPrimary(true);
+ } else {
+ receiverEntry.setIsPrimary(false);
}
-
} catch (ExecutionException e) {
- ioe = new IOException("Exception during image upload: " + e.getMessage(),
- e.getCause());
- break;
+ // Even if exception happens, still proceeds to next NN url.
+ // so that fail to upload to previous NN does not cause the
+ // remaining NN not getting the fsImage.
+ ioes.add(new IOException("Exception during image upload", e));
} catch (InterruptedException e) {
ie = e;
break;
}
}
- lastUploadTime = monotonicNow();
-
- // we are primary if we successfully updated the ANN
- this.isPrimaryCheckPointer = success;
-
// cleaner than copying code for multiple catch statements and better than catching all
// exceptions, so we just handle the ones we expect.
- if (ie != null || ioe != null) {
+ if (ie != null) {
// cancel the rest of the tasks, and close the pool
- for (; i < uploads.size(); i++) {
- Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+ for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
+ uploads.entrySet()) {
+ Future<TransferFsImage.TransferResult> upload = entry.getValue();
// The background thread may be blocked waiting in the throttler, so
// interrupt it.
upload.cancel(true);
@@ -282,11 +325,11 @@ public class StandbyCheckpointer {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// re-throw the exception we got, since one of these two must be non-null
- if (ie != null) {
- throw ie;
- } else if (ioe != null) {
- throw ioe;
- }
+ throw ie;
+ }
+
+ if (!ioes.isEmpty()) {
+ throw MultipleIOException.createIOException(ioes);
}
}
@@ -369,7 +412,6 @@ public class StandbyCheckpointer {
// Reset checkpoint time so that we don't always checkpoint
// on startup.
lastCheckpointTime = monotonicNow();
- lastUploadTime = monotonicNow();
while (shouldRun) {
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
if (!needRollbackCheckpoint) {
@@ -422,10 +464,7 @@ public class StandbyCheckpointer {
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
// rollback request, are the checkpointer, are outside the quiet period.
- final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
- boolean sendRequest = isPrimaryCheckPointer
- || secsSinceLastUpload >= checkpointConf.getQuietPeriod();
- doCheckpoint(sendRequest);
+ doCheckpoint();
// reset needRollbackCheckpoint to false only when we finish a ckpt
// for rollback image
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index ca28874..b0586d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -109,6 +109,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -988,6 +989,11 @@ public class MiniDFSCluster implements AutoCloseable {
format, operation, clusterId, nnCounter);
nnCounter += nameservice.getNNs().size();
}
+
+ for (NameNodeInfo nn : namenodes.values()) {
+ nn.nameNode.getHttpServer()
+ .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
+ }
}
/**
@@ -2127,6 +2133,8 @@ public class MiniDFSCluster implements AutoCloseable {
}
NameNode nn = NameNode.createNameNode(args, info.conf);
+ nn.getHttpServer()
+ .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
info.nameNode = nn;
info.setStartOpt(startOpt);
if (waitActive) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 9a4f6db..2fc1cd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
+import static org.apache.hadoop.hdfs.server.namenode.ImageServlet.RECENT_IMAGE_CHECK_ENABLED;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -2475,6 +2476,60 @@ public class TestCheckpoint {
}
}
+ @Test(timeout = 300000)
+ public void testActiveRejectSmallerDeltaImage() throws Exception {
+ MiniDFSCluster cluster = null;
+ Configuration conf = new HdfsConfiguration();
+ // Set the delta txid threshold to 10
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
+ // Set the delta time threshold to some arbitrarily large value, so
+ // it does not trigger a checkpoint during this test.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 900000);
+
+ SecondaryNameNode secondary = null;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+ .format(true).build();
+ // enable small delta rejection
+ NameNode active = cluster.getNameNode();
+ active.httpServer.getHttpServer()
+ .setAttribute(RECENT_IMAGE_CHECK_ENABLED, true);
+
+ secondary = startSecondaryNameNode(conf);
+
+ FileSystem fs = cluster.getFileSystem();
+ assertEquals(0, active.getNamesystem().getFSImage()
+ .getMostRecentCheckpointTxId());
+
+ // create 5 dir.
+ for (int i = 0; i < 5; i++) {
+ fs.mkdirs(new Path("dir-" + i));
+ }
+
+ // Checkpoint 1st
+ secondary.doCheckpoint();
+ // at this point, the txid delta is smaller than threshold 10.
+ // active does not accept this image.
+ assertEquals(0, active.getNamesystem().getFSImage()
+ .getMostRecentCheckpointTxId());
+
+ // create another 10 dir.
+ for (int i = 0; i < 10; i++) {
+ fs.mkdirs(new Path("dir2-" + i));
+ }
+
+ // Checkpoint 2nd
+ secondary.doCheckpoint();
+ // here the delta is large enough and active accepts this image.
+ assertEquals(21, active.getNamesystem().getFSImage()
+ .getMostRecentCheckpointTxId());
+ } finally {
+ cleanup(secondary);
+ cleanup(cluster);
+ }
+ }
+
private static void cleanup(SecondaryNameNode snn) {
if (snn != null) {
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index 75b2412..da130c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -21,6 +21,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -252,7 +253,41 @@ public class TestStandbyCheckpoints {
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
}
-
+
+ /**
+ * Test for the case of when there are observer NameNodes, Standby node is
+ * able to upload fsImage to Observer node as well.
+ */
+ @Test(timeout = 300000)
+ public void testStandbyAndObserverState() throws Exception {
+ // Transition 2 to observer
+ cluster.transitionToObserver(1);
+ doEdits(0, 10);
+ // After a rollEditLog, Standby(nn1) 's next checkpoint would be
+ // ahead of observer(nn2).
+ nns[0].getRpcServer().rollEditLog();
+
+ // After standby creating a checkpoint, it will try to push the image to
+ // active and all observer, updating it's own txid to the most recent.
+ HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
+ HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
+ HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12));
+
+ assertEquals(12, nns[2].getNamesystem().getFSImage()
+ .getMostRecentCheckpointTxId());
+ assertEquals(12, nns[1].getNamesystem().getFSImage()
+ .getMostRecentCheckpointTxId());
+
+ List<File> dirs = Lists.newArrayList();
+ // observer and standby both have this same image.
+ dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2));
+ dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
+ FSImageTestUtil.assertParallelFilesAreIdentical(
+ dirs, Collections.<String>emptySet());
+ // Restore 2 back to standby
+ cluster.transitionToStandby(2);
+ }
+
/**
* Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org