You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/07/05 04:16:55 UTC
[02/50] [abbrv] hadoop git commit: HDFS-11334: [SPS]: NN switch and
rescheduling movements can lead to have more than one coordinator for same
file blocks. Contributed by Rakesh R.
HDFS-11334: [SPS]: NN switch and rescheduling movements can lead to have more than one coordinator for same file blocks. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d3e8acf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d3e8acf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d3e8acf
Branch: refs/heads/HDFS-10285
Commit: 2d3e8acfbb05acddd8fd4a82324e0d28824bb24e
Parents: 85fc713
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Apr 18 15:23:58 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Jul 5 08:34:46 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 +
.../server/blockmanagement/DatanodeManager.java | 12 ++
.../hdfs/server/datanode/BPServiceActor.java | 4 +-
.../datanode/BlockStorageMovementTracker.java | 37 +++-
.../hadoop/hdfs/server/datanode/DataNode.java | 12 +-
.../datanode/StoragePolicySatisfyWorker.java | 95 +++++++++--
.../BlockStorageMovementAttemptedItems.java | 80 ++++++---
.../server/namenode/StoragePolicySatisfier.java | 15 +-
.../protocol/BlocksStorageMovementResult.java | 6 +-
.../src/main/proto/DatanodeProtocol.proto | 1 +
.../TestStoragePolicySatisfyWorker.java | 68 ++++----
.../TestStoragePolicySatisfierWithHA.java | 170 +++++++++++++++++--
13 files changed, 413 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 00152cc..b5341a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -619,7 +619,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.self.retry.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
- 30 * 60 * 1000;
+ 20 * 60 * 1000;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 0c03608..996b986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -985,6 +985,9 @@ public class PBHelper {
case FAILURE:
status = Status.FAILURE;
break;
+ case IN_PROGRESS:
+ status = Status.IN_PROGRESS;
+ break;
default:
throw new AssertionError("Unknown status: " + resultProto.getStatus());
}
@@ -1011,6 +1014,9 @@ public class PBHelper {
case FAILURE:
status = BlocksStorageMovementResultProto.Status.FAILURE;
break;
+ case IN_PROGRESS:
+ status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
+ break;
default:
throw new AssertionError("Unknown status: " + report.getStatus());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index da340a8..2d7c80e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1091,6 +1091,18 @@ public class DatanodeManager {
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
+ // Sets dropSPSWork flag to true, to ensure that
+ // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
+ // response immediately after the node registration. This is
+ // to avoid a situation, where multiple trackId responses coming from
+ // different co-odinator datanodes. After SPS monitor time out, it
+ // will retry the files which were scheduled to the disconnected(for
+ // long time more than heartbeat expiry) DN, by finding new
+ // co-ordinator datanode. Now, if the expired datanode reconnects back
+ // after SPS reschedules, it leads to get different movement results
+ // from reconnected and new DN co-ordinators.
+ nodeS.setDropSPSWork(true);
+
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 0f93fb0..f537f49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -536,7 +536,7 @@ class BPServiceActor implements Runnable {
// Remove the blocks movement results after successfully transferring
// to namenode.
- dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+ dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
.remove(blksMovementResults);
return response;
@@ -544,7 +544,7 @@ class BPServiceActor implements Runnable {
private BlocksStorageMovementResult[] getBlocksMovementResults() {
List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
- .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+ .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
.getBlksMovementResults();
BlocksStorageMovementResult[] blksMovementResult =
new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index e623cef..99858bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -28,7 +29,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,32 +42,34 @@ public class BlockStorageMovementTracker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BlockStorageMovementTracker.class);
private final CompletionService<BlockMovementResult> moverCompletionService;
- private final BlocksMovementsCompletionHandler blksMovementscompletionHandler;
+ private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
// Keeps the information - trackID vs its list of blocks
private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
private final Map<Long, List<BlockMovementResult>> movementResults;
+ private volatile boolean running = true;
+
/**
* BlockStorageMovementTracker constructor.
*
* @param moverCompletionService
* completion service.
* @param handler
- * blocks movements completion handler
+ * blocks movements status handler
*/
public BlockStorageMovementTracker(
CompletionService<BlockMovementResult> moverCompletionService,
- BlocksMovementsCompletionHandler handler) {
+ BlocksMovementsStatusHandler handler) {
this.moverCompletionService = moverCompletionService;
this.moverTaskFutures = new HashMap<>();
- this.blksMovementscompletionHandler = handler;
+ this.blksMovementsStatusHandler = handler;
this.movementResults = new HashMap<>();
}
@Override
public void run() {
- while (true) {
+ while (running) {
if (moverTaskFutures.size() <= 0) {
try {
synchronized (moverTaskFutures) {
@@ -95,8 +98,8 @@ public class BlockStorageMovementTracker implements Runnable {
synchronized (moverTaskFutures) {
moverTaskFutures.remove(trackId);
}
- // handle completed blocks movements per trackId.
- blksMovementscompletionHandler.handle(resultPerTrackIdList);
+ // handle completed or inprogress blocks movements per trackId.
+ blksMovementsStatusHandler.handle(resultPerTrackIdList);
movementResults.remove(trackId);
}
}
@@ -158,4 +161,22 @@ public class BlockStorageMovementTracker implements Runnable {
movementResults.clear();
}
}
+
+ /**
+ * @return the list of trackIds which are still waiting to complete all the
+ * scheduled blocks movements.
+ */
+ Set<Long> getInProgressTrackIds() {
+ synchronized (moverTaskFutures) {
+ return moverTaskFutures.keySet();
+ }
+ }
+
+ /**
+ * Sets running flag to false and clear the pending movement result queues.
+ */
+ public void stopTracking() {
+ running = false;
+ removeAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index fc2cf1a..76d2efb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1425,6 +1425,7 @@ public class DataNode extends ReconfigurableBase
blockRecoveryWorker = new BlockRecoveryWorker(this);
storagePolicySatisfyWorker =
new StoragePolicySatisfyWorker(getConf(), this);
+ storagePolicySatisfyWorker.start();
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@@ -1976,7 +1977,11 @@ public class DataNode extends ReconfigurableBase
}
}
}
-
+
+ // stop storagePolicySatisfyWorker
+ if (storagePolicySatisfyWorker != null) {
+ storagePolicySatisfyWorker.stop();
+ }
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
@@ -2129,6 +2134,11 @@ public class DataNode extends ReconfigurableBase
notifyAll();
}
tracer.close();
+
+ // Waiting to finish SPS worker thread.
+ if (storagePolicySatisfyWorker != null) {
+ storagePolicySatisfyWorker.waitToFinishWorkerThread();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index a96ac98..f4f97dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -31,7 +32,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@@ -87,10 +90,13 @@ public class StoragePolicySatisfyWorker {
private final int moverThreads;
private final ExecutorService moveExecutor;
private final CompletionService<BlockMovementResult> moverCompletionService;
- private final BlocksMovementsCompletionHandler handler;
+ private final BlocksMovementsStatusHandler handler;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
+ private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
+ private long nextInprogressRecheckTime;
+
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@@ -99,15 +105,52 @@ public class StoragePolicySatisfyWorker {
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
- handler = new BlocksMovementsCompletionHandler();
+ handler = new BlocksMovementsStatusHandler();
movementTracker = new BlockStorageMovementTracker(moverCompletionService,
handler);
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
- movementTrackerThread.start();
+
+ // Interval to check that the inprogress trackIds. The time interval is
+ // proportional o the heart beat interval time period.
+ final long heartbeatIntervalSeconds = conf.getTimeDuration(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+ inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
+ // update first inprogress recheck time to a future time stamp.
+ nextInprogressRecheckTime = monotonicNow()
+ + inprogressTrackIdsCheckInterval;
+
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
+ /**
+ * Start StoragePolicySatisfyWorker, which will start block movement tracker
+ * thread to track the completion of block movements.
+ */
+ void start() {
+ movementTrackerThread.start();
+ }
+
+ /**
+ * Stop StoragePolicySatisfyWorker, which will stop block movement tracker
+ * thread.
+ */
+ void stop() {
+ movementTrackerThread.interrupt();
+ movementTracker.stopTracking();
+ }
+
+ /**
+ * Timed wait to stop BlockStorageMovement tracker daemon thread.
+ */
+ void waitToFinishWorkerThread() {
+ try {
+ movementTrackerThread.join(3000);
+ } catch (InterruptedException ie) {
+ }
+ }
+
private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
@@ -352,11 +395,11 @@ public class StoragePolicySatisfyWorker {
}
/**
- * Blocks movements completion handler, which is used to collect details of
- * the completed list of block movements and this status(success or failure)
- * will be send to the namenode via heartbeat.
+ * Blocks movements status handler, which is used to collect details of the
+ * completed or inprogress list of block movements and this status(success or
+ * failure or inprogress) will be send to the namenode via heartbeat.
*/
- static class BlocksMovementsCompletionHandler {
+ class BlocksMovementsStatusHandler {
private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
new ArrayList<>();
@@ -395,14 +438,21 @@ public class StoragePolicySatisfyWorker {
* @return unmodifiable list of blocks storage movement results.
*/
List<BlocksStorageMovementResult> getBlksMovementResults() {
+ List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
+ // 1. Adding all the completed trackids.
synchronized (trackIdVsMovementStatus) {
- if (trackIdVsMovementStatus.size() <= 0) {
- return new ArrayList<>();
+ if (trackIdVsMovementStatus.size() > 0) {
+ movementResults = Collections
+ .unmodifiableList(trackIdVsMovementStatus);
}
- List<BlocksStorageMovementResult> results = Collections
- .unmodifiableList(trackIdVsMovementStatus);
- return results;
}
+ // 2. Adding the in progress track ids after those which are completed.
+ Set<Long> inProgressTrackIds = getInProgressTrackIds();
+ for (Long trackId : inProgressTrackIds) {
+ movementResults.add(new BlocksStorageMovementResult(trackId,
+ BlocksStorageMovementResult.Status.IN_PROGRESS));
+ }
+ return movementResults;
}
/**
@@ -433,7 +483,7 @@ public class StoragePolicySatisfyWorker {
}
@VisibleForTesting
- BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
+ BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() {
return handler;
}
@@ -447,4 +497,23 @@ public class StoragePolicySatisfyWorker {
movementTracker.removeAll();
handler.removeAll();
}
+
+ /**
+ * Gets list of trackids which are inprogress. Will do collection periodically
+ * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
+ * millis' interval.
+ *
+ * @return collection of trackids which are inprogress
+ */
+ private Set<Long> getInProgressTrackIds() {
+ Set<Long> trackIds = new HashSet<>();
+ long now = monotonicNow();
+ if (nextInprogressRecheckTime >= now) {
+ trackIds = movementTracker.getInProgressTrackIds();
+
+ // schedule next re-check interval
+ nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
+ }
+ return trackIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 26b98d8..f2406da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +40,11 @@ import com.google.common.annotations.VisibleForTesting;
* A monitor class for checking whether block storage movements finished or not.
* If block storage movement results from datanode indicates about the movement
* success, then it will just remove the entries from tracking. If it reports
- * failure, then it will add back to needed block storage movements list. If no
- * DN reports about movement for longer time, then such items will be retries
- * automatically after timeout. The default timeout would be 30mins.
+ * failure, then it will add back to needed block storage movements list. If it
+ * reports in_progress, that means the blocks movement is in progress and the
+ * coordinator is still tracking the movement. If no DN reports about movement
+ * for longer time, then such items will be retries automatically after timeout.
+ * The default timeout would be 30mins.
*/
public class BlockStorageMovementAttemptedItems {
private static final Logger LOG =
@@ -57,10 +60,10 @@ public class BlockStorageMovementAttemptedItems {
private Daemon timerThread = null;
private final StoragePolicySatisfier sps;
//
- // It might take anywhere between 30 to 60 minutes before
+ // It might take anywhere between 20 to 60 minutes before
// a request is timed out.
//
- private long selfRetryTimeout = 30 * 60 * 1000;
+ private long selfRetryTimeout = 20 * 60 * 1000;
//
// It might take anywhere between 5 to 10 minutes before
@@ -159,35 +162,35 @@ public class BlockStorageMovementAttemptedItems {
/**
* This class contains information of an attempted trackID. Information such
- * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID
- * were attempted and blocks movement has been scheduled to satisfy storage
- * policy. This is used by
+ * as, (a)last attempted or reported time stamp, (b)whether all the blocks in
+ * the trackID were attempted and blocks movement has been scheduled to
+ * satisfy storage policy. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
private final static class ItemInfo {
- private final long lastAttemptedTimeStamp;
+ private long lastAttemptedOrReportedTime;
private final boolean allBlockLocsAttemptedToSatisfy;
/**
* ItemInfo constructor.
*
- * @param lastAttemptedTimeStamp
- * last attempted time stamp
+ * @param lastAttemptedOrReportedTime
+ * last attempted or reported time
* @param allBlockLocsAttemptedToSatisfy
* whether all the blocks in the trackID were attempted and blocks
* movement has been scheduled to satisfy storage policy
*/
- private ItemInfo(long lastAttemptedTimeStamp,
+ private ItemInfo(long lastAttemptedOrReportedTime,
boolean allBlockLocsAttemptedToSatisfy) {
- this.lastAttemptedTimeStamp = lastAttemptedTimeStamp;
+ this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
}
/**
- * @return last attempted time stamp.
+ * @return last attempted or reported time stamp.
*/
- private long getLastAttemptedTimeStamp() {
- return lastAttemptedTimeStamp;
+ private long getLastAttemptedOrReportedTime() {
+ return lastAttemptedOrReportedTime;
}
/**
@@ -200,6 +203,14 @@ public class BlockStorageMovementAttemptedItems {
private boolean isAllBlockLocsAttemptedToSatisfy() {
return allBlockLocsAttemptedToSatisfy;
}
+
+ /**
+ * Update lastAttemptedOrReportedTime, so that the expiration time will be
+ * postponed to future.
+ */
+ private void touchLastReportedTimeStamp() {
+ this.lastAttemptedOrReportedTime = monotonicNow();
+ }
}
/**
@@ -234,7 +245,8 @@ public class BlockStorageMovementAttemptedItems {
while (iter.hasNext()) {
Entry<Long, ItemInfo> entry = iter.next();
ItemInfo itemInfo = entry.getValue();
- if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) {
+ if (now > itemInfo.getLastAttemptedOrReportedTime()
+ + selfRetryTimeout) {
Long blockCollectionID = entry.getKey();
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
@@ -273,6 +285,7 @@ public class BlockStorageMovementAttemptedItems {
Iterator<BlocksStorageMovementResult> resultsIter =
storageMovementAttemptedResults.iterator();
while (resultsIter.hasNext()) {
+ boolean isInprogress = false;
// TrackID need to be retried in the following cases:
// 1) All or few scheduled block(s) movement has been failed.
// 2) All the scheduled block(s) movement has been succeeded but there
@@ -282,16 +295,19 @@ public class BlockStorageMovementAttemptedItems {
BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
.next();
synchronized (storageMovementAttemptedItems) {
- if (storageMovementAttemptedResult
- .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
+ Status status = storageMovementAttemptedResult.getStatus();
+ ItemInfo itemInfo;
+ switch (status) {
+ case FAILURE:
blockStorageMovementNeeded
.add(storageMovementAttemptedResult.getTrackId());
LOG.warn("Blocks storage movement results for the tracking id: {}"
+ " is reported from co-ordinating datanode, but result"
+ " status is FAILURE. So, added for retry",
storageMovementAttemptedResult.getTrackId());
- } else {
- ItemInfo itemInfo = storageMovementAttemptedItems
+ break;
+ case SUCCESS:
+ itemInfo = storageMovementAttemptedItems
.get(storageMovementAttemptedResult.getTrackId());
// ItemInfo could be null. One case is, before the blocks movements
@@ -320,10 +336,26 @@ public class BlockStorageMovementAttemptedItems {
this.sps.notifyBlkStorageMovementFinished(
storageMovementAttemptedResult.getTrackId());
}
+ break;
+ case IN_PROGRESS:
+ isInprogress = true;
+ itemInfo = storageMovementAttemptedItems
+ .get(storageMovementAttemptedResult.getTrackId());
+ if(itemInfo != null){
+ // update the attempted expiration time to next cycle.
+ itemInfo.touchLastReportedTimeStamp();
+ }
+ break;
+ default:
+ LOG.error("Unknown status: {}", status);
+ break;
+ }
+ // Remove trackID from the attempted list if the attempt has been
+ // completed(success or failure), if any.
+ if (!isInprogress) {
+ storageMovementAttemptedItems
+ .remove(storageMovementAttemptedResult.getTrackId());
}
- // Remove trackID from the attempted list, if any.
- storageMovementAttemptedItems
- .remove(storageMovementAttemptedResult.getTrackId());
}
// Remove trackID from results as processed above.
resultsIter.remove();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 8cf9920..8be0a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -108,6 +108,11 @@ public class StoragePolicySatisfier implements Runnable {
} else {
LOG.info("Starting StoragePolicySatisfier.");
}
+
+ // Ensure that all the previously submitted block movements(if any) have to
+ // be stopped in all datanodes.
+ addDropSPSWorkCommandsToAllDNs();
+
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -133,7 +138,7 @@ public class StoragePolicySatisfier implements Runnable {
LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+ "deactivate it.");
this.clearQueuesWithNotification();
- this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ addDropSPSWorkCommandsToAllDNs();
} else {
LOG.info("Stopping StoragePolicySatisfier.");
}
@@ -170,6 +175,14 @@ public class StoragePolicySatisfier implements Runnable {
return namesystem.isFileOpenedForWrite(moverId);
}
+ /**
+ * Adding drop commands to all datanodes to stop performing the satisfier
+ * block movements, if any.
+ */
+ private void addDropSPSWorkCommandsToAllDNs() {
+ this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ }
+
@Override
public void run() {
boolean isMoverRunning = !checkIfMoverRunning();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
index 713b83b..b484eb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
@@ -35,9 +35,13 @@ public class BlocksStorageMovementResult {
* retry these failed blocks movements. Example selected target node is no
* more running or no space. So, retrying by selecting new target node might
* work.
+ *
+ * <p>
+ * IN_PROGRESS - If all or some of the blocks associated to track id are
+ * still moving.
*/
public static enum Status {
- SUCCESS, FAILURE;
+ SUCCESS, FAILURE, IN_PROGRESS;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 899dc7e..080f7fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -192,6 +192,7 @@ message BlocksStorageMovementResultProto {
enum Status {
SUCCESS = 1; // block movement succeeded
FAILURE = 2; // block movement failed and needs to retry
+ IN_PROGRESS = 3; // block movement is still in progress
}
required uint64 trackID = 1;
required Status status = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 86b8b50..8fbbf33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -176,16 +176,21 @@ public class TestStoragePolicySatisfyWorker {
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
src);
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
- lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
- lb.getStorageTypes()[0], StorageType.ARCHIVE);
- blockMovingInfos.add(blockMovingInfo);
- INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
- worker.processBlockMovingTasks(inode.getId(),
- cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
-
- waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+ try {
+ worker.start();
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+ lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
+ lb.getStorageTypes()[0], StorageType.ARCHIVE);
+ blockMovingInfos.add(blockMovingInfo);
+ INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+ worker.processBlockMovingTasks(inode.getId(),
+ cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+
+ waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+ } finally {
+ worker.stop();
+ }
}
/**
@@ -212,24 +217,29 @@ public class TestStoragePolicySatisfyWorker {
StoragePolicySatisfyWorker worker =
new StoragePolicySatisfyWorker(conf, src);
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- List<LocatedBlock> locatedBlocks =
- dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
- for (LocatedBlock locatedBlock : locatedBlocks) {
- BlockMovingInfo blockMovingInfo =
- prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
- locatedBlock.getLocations()[0], targetDnInfo,
- locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
- blockMovingInfos.add(blockMovingInfo);
+ worker.start();
+ try {
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ List<LocatedBlock> locatedBlocks =
+ dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
+ for (LocatedBlock locatedBlock : locatedBlocks) {
+ BlockMovingInfo blockMovingInfo =
+ prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
+ locatedBlock.getLocations()[0], targetDnInfo,
+ locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
+ blockMovingInfos.add(blockMovingInfo);
+ }
+ INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+ worker.processBlockMovingTasks(inode.getId(),
+ cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+ // Wait till results queue build up
+ waitForBlockMovementResult(worker, inode.getId(), 30000);
+ worker.dropSPSWork();
+ assertTrue(worker.getBlocksMovementsStatusHandler()
+ .getBlksMovementResults().size() == 0);
+ } finally {
+ worker.stop();
}
- INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
- worker.processBlockMovingTasks(inode.getId(),
- cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
- // Wait till results queue build up
- waitForBlockMovementResult(worker, inode.getId(), 30000);
- worker.dropSPSWork();
- assertTrue(worker.getBlocksMovementsCompletionHandler()
- .getBlksMovementResults().size() == 0);
}
private void waitForBlockMovementResult(
@@ -239,7 +249,7 @@ public class TestStoragePolicySatisfyWorker {
@Override
public Boolean get() {
List<BlocksStorageMovementResult> completedBlocks = worker
- .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+ .getBlocksMovementsStatusHandler().getBlksMovementResults();
return completedBlocks.size() > 0;
}
}, 100, timeout);
@@ -252,7 +262,7 @@ public class TestStoragePolicySatisfyWorker {
@Override
public Boolean get() {
List<BlocksStorageMovementResult> completedBlocks = worker
- .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+ .getBlocksMovementsStatusHandler().getBlksMovementResults();
int failedCount = 0;
for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
if (blkMovementResult.getStatus() ==
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d3e8acf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
index 4d226ff..c88d5be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
@@ -17,51 +17,90 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-
-import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests that StoragePolicySatisfier is able to work with HA enabled.
*/
public class TestStoragePolicySatisfierWithHA {
private MiniDFSCluster cluster = null;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class);
- @Before
- public void setUp() throws IOException {
- Configuration conf = new Configuration();
+ private final Configuration config = new HdfsConfiguration();
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
+ private DistributedFileSystem dfs = null;
+
+ private StorageType[][] allDiskTypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK}};
+ private int numOfDatanodes = 3;
+ private int storagesPerDatanode = 2;
+ private long capacity = 2 * 256 * 1024 * 1024;
+ private int nnIndex = 0;
+
+ private void createCluster() throws IOException {
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode,
+ capacity);
+ dfs = cluster.getFileSystem(nnIndex);
+ }
+
+ private void startCluster(final Configuration conf,
+ StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+ long nodeCapacity) throws IOException {
+ long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+ for (int i = 0; i < numberOfDatanodes; i++) {
+ for (int j = 0; j < storagesPerDn; j++) {
+ capacities[i][j] = nodeCapacity;
+ }
+ }
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(1)
- .build();
+ .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+ .storageTypes(storageTypes).storageCapacities(capacities).build();
+ cluster.waitActive();
+ cluster.transitionToActive(0);
}
/**
* Tests to verify that SPS should run/stop automatically when NN state
* changes between Standby and Active.
*/
- @Test(timeout = 100000)
+ @Test(timeout = 90000)
public void testWhenNNHAStateChanges() throws IOException {
try {
- DistributedFileSystem fs;
+ createCluster();
boolean running;
- cluster.waitActive();
- fs = cluster.getFileSystem(0);
+ dfs = cluster.getFileSystem(1);
try {
- fs.getClient().isStoragePolicySatisfierRunning();
+ dfs.getClient().isStoragePolicySatisfierRunning();
Assert.fail("Call this function to Standby NN should "
+ "raise an exception.");
} catch (RemoteException e) {
@@ -72,14 +111,15 @@ public class TestStoragePolicySatisfierWithHA {
}
cluster.transitionToActive(0);
- running = fs.getClient().isStoragePolicySatisfierRunning();
+ dfs = cluster.getFileSystem(0);
+ running = dfs.getClient().isStoragePolicySatisfierRunning();
Assert.assertTrue("StoragePolicySatisfier should be active "
+ "when NN transits from Standby to Active mode.", running);
// NN transits from Active to Standby
cluster.transitionToStandby(0);
try {
- fs.getClient().isStoragePolicySatisfierRunning();
+ dfs.getClient().isStoragePolicySatisfierRunning();
Assert.fail("NN in Standby again, call this function should "
+ "raise an exception.");
} catch (RemoteException e) {
@@ -106,4 +146,104 @@ public class TestStoragePolicySatisfierWithHA {
cluster.shutdown();
}
}
+
+ /**
+ * Test to verify that during namenode switch over will add
+ * DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to
+ * drop all the SPS queues at datanode.
+ */
+ @Test(timeout = 90000)
+ public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception {
+ try {
+ createCluster();
+
+ FSNamesystem fsn = cluster.getNamesystem(0);
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+ List<DatanodeDescriptor> listOfDns = new ArrayList<>();
+ for (DataNode dn : dataNodes) {
+ DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
+ dn.getDatanodeId());
+ listOfDns.add(dnd);
+ }
+ cluster.shutdownDataNodes();
+
+ cluster.transitionToStandby(0);
+ LOG.info("**Transition to Active**");
+ cluster.transitionToActive(1);
+
+ // Verify that Standby-to-Active transition should set drop SPS flag to
+ // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
+ // propagated to datanode during heartbeat response.
+ int retries = 20;
+ boolean dropSPSWork = false;
+ while (retries > 0) {
+ for (DatanodeDescriptor dnd : listOfDns) {
+ dropSPSWork = dnd.shouldDropSPSWork();
+ if (!dropSPSWork) {
+ retries--;
+ Thread.sleep(250);
+ break;
+ }
+ }
+ if (dropSPSWork) {
+ break;
+ }
+ }
+ Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test to verify that SPS work will be dropped once the datanode is marked as
+ * expired. Internally 'dropSPSWork' flag is set as true while expiration and
+ * at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that
+ * datanode.
+ */
+ @Test(timeout = 90000)
+ public void testDeadDatanode() throws Exception {
+ int heartbeatExpireInterval = 2 * 2000;
+ config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 3000);
+ config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L);
+ createCluster();
+
+ DataNode dn = cluster.getDataNodes().get(0);
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+
+ FSNamesystem fsn = cluster.getNamesystem(0);
+ DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
+ dn.getDatanodeId());
+ boolean isDead = false;
+ int retries = 20;
+ while (retries > 0) {
+ isDead = dnd.getLastUpdateMonotonic() < (monotonicNow()
+ - heartbeatExpireInterval);
+ if (isDead) {
+ break;
+ }
+ retries--;
+ Thread.sleep(250);
+ }
+ Assert.assertTrue("Datanode is alive", isDead);
+ // Disable datanode heartbeat, so that the datanode will get expired after
+ // the recheck interval and become dead.
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+
+ // Verify that datanode expiration will set drop SPS flag to
+ // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
+ // propagated to datanode during reconnection.
+ boolean dropSPSWork = false;
+ retries = 50;
+ while (retries > 0) {
+ dropSPSWork = dnd.shouldDropSPSWork();
+ if (dropSPSWork) {
+ break;
+ }
+ retries--;
+ Thread.sleep(100);
+ }
+ Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org