You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/07/05 04:17:41 UTC
[48/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects
successfully moved block details via IBR. Contributed by Rakesh R.
HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14e5b64f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14e5b64f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14e5b64f
Branch: refs/heads/HDFS-10285
Commit: 14e5b64f1a456e416c3425c19bbde1b26d7a2dbc
Parents: 60861a9
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Sun Apr 29 11:06:59 2018 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Jul 5 09:01:20 2018 +0530
----------------------------------------------------------------------
.../DatanodeProtocolClientSideTranslatorPB.java | 11 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 4 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 25 ---
.../server/blockmanagement/BlockManager.java | 86 +++++++++-
.../sps/BlockMovementAttemptFinished.java | 24 ++-
.../common/sps/BlockStorageMovementTracker.java | 109 +-----------
.../sps/BlocksMovementsStatusHandler.java | 70 +-------
.../hdfs/server/datanode/BPServiceActor.java | 14 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 7 +-
.../datanode/StoragePolicySatisfyWorker.java | 48 ++----
.../namenode/FSDirSatisfyStoragePolicyOp.java | 13 +-
.../hdfs/server/namenode/FSDirXAttrOp.java | 8 +-
.../hdfs/server/namenode/FSDirectory.java | 5 +-
.../hdfs/server/namenode/FSNamesystem.java | 30 ++--
.../hadoop/hdfs/server/namenode/NameNode.java | 19 ++-
.../hdfs/server/namenode/NameNodeRpcServer.java | 46 +++--
.../sps/BlockStorageMovementAttemptedItems.java | 167 +++++++++++++------
.../hdfs/server/namenode/sps/SPSService.java | 19 ++-
.../namenode/sps/StoragePolicySatisfier.java | 154 +++++++++++------
.../hdfs/server/protocol/DatanodeProtocol.java | 4 +-
.../sps/ExternalSPSBlockMoveTaskHandler.java | 32 ++--
.../sps/ExternalStoragePolicySatisfier.java | 3 +-
.../src/main/proto/DatanodeProtocol.proto | 9 -
.../src/main/resources/hdfs-default.xml | 41 +++++
.../TestNameNodePrunesMissingStorages.java | 4 +-
.../datanode/InternalDataNodeTestUtils.java | 4 +-
.../SimpleBlocksMovementsStatusHandler.java | 88 ++++++++++
.../server/datanode/TestBPOfferService.java | 12 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 4 +-
.../server/datanode/TestDataNodeLifeline.java | 7 +-
.../TestDatanodeProtocolRetryPolicy.java | 4 +-
.../server/datanode/TestFsDatasetCache.java | 4 +-
.../TestStoragePolicySatisfyWorker.java | 76 +--------
.../hdfs/server/datanode/TestStorageReport.java | 4 +-
.../server/namenode/NNThroughputBenchmark.java | 9 +-
.../hdfs/server/namenode/NameNodeAdapter.java | 4 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 5 +-
.../namenode/TestNameNodeReconfigure.java | 17 +-
.../TestBlockStorageMovementAttemptedItems.java | 88 ++++++----
.../sps/TestStoragePolicySatisfier.java | 73 ++++++--
...stStoragePolicySatisfierWithStripedFile.java | 40 +++--
.../sps/TestExternalStoragePolicySatisfier.java | 44 ++---
42 files changed, 776 insertions(+), 659 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index dcc0705..e4125dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -139,8 +138,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
- @Nonnull SlowDiskReports slowDisks,
- BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+ @Nonnull SlowDiskReports slowDisks)
throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
@@ -165,13 +163,6 @@ public class DatanodeProtocolClientSideTranslatorPB implements
builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
}
- // Adding blocks movement results to the heart beat request.
- if (storageMovementFinishedBlks != null
- && storageMovementFinishedBlks.getBlocks() != null) {
- builder.setStorageMoveAttemptFinishedBlks(
- PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
- }
-
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index b5bb80a..5cba284 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -122,9 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
- PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
- PBHelper.convertBlksMovReport(
- request.getStorageMoveAttemptFinishedBlks()));
+ PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
} catch (IOException e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 38f72c0..f51f839 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -105,7 +104,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -971,29 +969,6 @@ public class PBHelper {
return SlowDiskReports.create(slowDisksMap);
}
- public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
- BlocksStorageMoveAttemptFinishedProto proto) {
-
- List<BlockProto> blocksList = proto.getBlocksList();
- Block[] blocks = new Block[blocksList.size()];
- for (int i = 0; i < blocksList.size(); i++) {
- BlockProto blkProto = blocksList.get(i);
- blocks[i] = PBHelperClient.convert(blkProto);
- }
- return new BlocksStorageMoveAttemptFinished(blocks);
- }
-
- public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
- BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
- BlocksStorageMoveAttemptFinishedProto.Builder builder =
- BlocksStorageMoveAttemptFinishedProto.newBuilder();
- Block[] blocks = blocksMoveAttemptFinished.getBlocks();
- for (Block block : blocks) {
- builder.addBlocks(PBHelperClient.convert(block));
- }
- return builder.build();
- }
-
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7e0c943..caf250f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -427,8 +429,11 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
- /** For satisfying block storage policies. */
- private final StoragePolicySatisfyManager spsManager;
+ /**
+ * For satisfying block storage policies. Instantiates if sps is enabled
+ * internally or externally.
+ */
+ private StoragePolicySatisfyManager spsManager;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -469,8 +474,7 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
- // sps manager manages the user invoked sps paths and does the movement.
- spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+ createSPSManager(conf);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
@@ -699,7 +703,9 @@ public class BlockManager implements BlockStatsMXBean {
}
public void close() {
- getSPSManager().stop();
+ if (getSPSManager() != null) {
+ getSPSManager().stop();
+ }
bmSafeMode.close();
try {
redundancyThread.interrupt();
@@ -713,7 +719,9 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
- getSPSManager().stopGracefully();
+ if (getSPSManager() != null) {
+ getSPSManager().stopGracefully();
+ }
}
/** @return the datanodeManager */
@@ -3881,6 +3889,21 @@ public class BlockManager implements BlockStatsMXBean {
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
+
+ // notify SPS about the reported block
+ notifyStorageMovementAttemptFinishedBlk(storageInfo, block);
+ }
+
+ private void notifyStorageMovementAttemptFinishedBlk(
+ DatanodeStorageInfo storageInfo, Block block) {
+ if (getSPSManager() != null) {
+ SPSService<Long> sps = getSPSManager().getInternalSPSService();
+ if (sps.isRunning()) {
+ sps.notifyStorageMovementAttemptFinishedBlk(
+ storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
+ block);
+ }
+ }
}
private void processAndHandleReportedBlock(
@@ -5018,6 +5041,57 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Create SPS manager instance. It manages the user invoked sps paths and does
+ * the movement.
+ *
+ * @param conf
+ * configuration
+ * @return true if the instance is successfully created, false otherwise.
+ */
+ private boolean createSPSManager(final Configuration conf) {
+ return createSPSManager(conf, null);
+ }
+
+ /**
+ * Create SPS manager instance. It manages the user invoked sps paths and does
+ * the movement.
+ *
+ * @param conf
+ * configuration
+ * @param spsMode
+ * satisfier mode
+ * @return true if the instance is successfully created, false otherwise.
+ */
+ public boolean createSPSManager(final Configuration conf,
+ final String spsMode) {
+ // sps manager manages the user invoked sps paths and does the movement.
+ // StoragePolicySatisfier(SPS) configs
+ boolean storagePolicyEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+ String modeVal = spsMode;
+ if (org.apache.commons.lang.StringUtils.isBlank(modeVal)) {
+ modeVal = conf.get(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+ }
+ StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
+ .fromString(modeVal);
+ if (!storagePolicyEnabled || mode == StoragePolicySatisfierMode.NONE) {
+ LOG.info("Storage policy satisfier is disabled");
+ return false;
+ }
+ spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+ return true;
+ }
+
+ /**
+ * Nullify SPS manager as this feature is disabled fully.
+ */
+ public void disableSPS() {
+ spsManager = null;
+ }
+
+ /**
* @return sps manager.
*/
public StoragePolicySatisfyManager getSPSManager() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
index 419d806..29c5e9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -33,6 +34,7 @@ public class BlockMovementAttemptFinished {
private final Block block;
private final DatanodeInfo src;
private final DatanodeInfo target;
+ private final StorageType targetType;
private final BlockMovementStatus status;
/**
@@ -44,14 +46,17 @@ public class BlockMovementAttemptFinished {
* src datanode
* @param target
* target datanode
+ * @param targetType
+ * target storage type
* @param status
* movement status
*/
public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
- DatanodeInfo target, BlockMovementStatus status) {
+ DatanodeInfo target, StorageType targetType, BlockMovementStatus status) {
this.block = block;
this.src = src;
this.target = target;
+ this.targetType = targetType;
this.status = status;
}
@@ -64,6 +69,20 @@ public class BlockMovementAttemptFinished {
}
/**
+ * @return the target datanode where it moved the block.
+ */
+ public DatanodeInfo getTargetDatanode() {
+ return target;
+ }
+
+ /**
+ * @return target storage type.
+ */
+ public StorageType getTargetType() {
+ return targetType;
+ }
+
+ /**
* @return block movement status code.
*/
public BlockMovementStatus getStatus() {
@@ -74,7 +93,8 @@ public class BlockMovementAttemptFinished {
public String toString() {
return new StringBuilder().append("Block movement attempt finished(\n ")
.append(" block : ").append(block).append(" src node: ").append(src)
- .append(" target node: ").append(target).append(" movement status: ")
+ .append(" target node: ").append(target).append(" target type: ")
+ .append(targetType).append(" movement status: ")
.append(status).append(")").toString();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
index b20d6cf..4ee415e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
@@ -17,17 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.common.sps;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,13 +34,10 @@ import org.slf4j.LoggerFactory;
public class BlockStorageMovementTracker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BlockStorageMovementTracker.class);
- private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
+ private final CompletionService<BlockMovementAttemptFinished>
+ moverCompletionService;
private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
- // Keeps the information - block vs its list of future move tasks
- private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
- private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
-
private volatile boolean running = true;
/**
@@ -60,53 +52,21 @@ public class BlockStorageMovementTracker implements Runnable {
CompletionService<BlockMovementAttemptFinished> moverCompletionService,
BlocksMovementsStatusHandler handler) {
this.moverCompletionService = moverCompletionService;
- this.moverTaskFutures = new HashMap<>();
this.blksMovementsStatusHandler = handler;
- this.movementResults = new HashMap<>();
}
@Override
public void run() {
while (running) {
- if (moverTaskFutures.size() <= 0) {
- try {
- synchronized (moverTaskFutures) {
- // Waiting for mover tasks.
- moverTaskFutures.wait(2000);
- }
- } catch (InterruptedException ignore) {
- // Sets interrupt flag of this thread.
- Thread.currentThread().interrupt();
- }
- }
try {
- Future<BlockMovementAttemptFinished> future =
- moverCompletionService.take();
+ Future<BlockMovementAttemptFinished> future = moverCompletionService
+ .take();
if (future != null) {
BlockMovementAttemptFinished result = future.get();
LOG.debug("Completed block movement. {}", result);
- Block block = result.getBlock();
- List<Future<BlockMovementAttemptFinished>> blocksMoving =
- moverTaskFutures.get(block);
- if (blocksMoving == null) {
- LOG.warn("Future task doesn't exist for block : {} ", block);
- continue;
- }
- blocksMoving.remove(future);
-
- List<BlockMovementAttemptFinished> resultPerTrackIdList =
- addMovementResultToBlockIdList(result);
-
- // Completed all the scheduled blocks movement under this 'trackId'.
- if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
- synchronized (moverTaskFutures) {
- moverTaskFutures.remove(block);
- }
- if (running) {
- // handle completed or inprogress blocks movements per trackId.
- blksMovementsStatusHandler.handle(resultPerTrackIdList);
- }
- movementResults.remove(block);
+ if (running && blksMovementsStatusHandler != null) {
+ // handle completed block movement.
+ blksMovementsStatusHandler.handle(result);
}
}
} catch (InterruptedException e) {
@@ -122,63 +82,10 @@ public class BlockStorageMovementTracker implements Runnable {
}
}
- private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
- BlockMovementAttemptFinished result) {
- Block block = result.getBlock();
- List<BlockMovementAttemptFinished> perBlockIdList;
- synchronized (movementResults) {
- perBlockIdList = movementResults.get(block);
- if (perBlockIdList == null) {
- perBlockIdList = new ArrayList<>();
- movementResults.put(block, perBlockIdList);
- }
- perBlockIdList.add(result);
- }
- return perBlockIdList;
- }
-
- /**
- * Add future task to the tracking list to check the completion status of the
- * block movement.
- *
- * @param blockID
- * block identifier
- * @param futureTask
- * future task used for moving the respective block
- */
- public void addBlock(Block block,
- Future<BlockMovementAttemptFinished> futureTask) {
- synchronized (moverTaskFutures) {
- List<Future<BlockMovementAttemptFinished>> futures =
- moverTaskFutures.get(block);
- // null for the first task
- if (futures == null) {
- futures = new ArrayList<>();
- moverTaskFutures.put(block, futures);
- }
- futures.add(futureTask);
- // Notify waiting tracker thread about the newly added tasks.
- moverTaskFutures.notify();
- }
- }
-
- /**
- * Clear the pending movement and movement result queues.
- */
- public void removeAll() {
- synchronized (moverTaskFutures) {
- moverTaskFutures.clear();
- }
- synchronized (movementResults) {
- movementResults.clear();
- }
- }
-
/**
- * Sets running flag to false and clear the pending movement result queues.
+ * Sets running flag to false.
*/
public void stopTracking() {
running = false;
- removeAll();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
index f9f3954..ab67424 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
@@ -18,78 +18,22 @@
package org.apache.hadoop.hdfs.server.common.sps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
/**
- * Blocks movements status handler, which is used to collect details of the
- * completed block movements and later these attempted finished(with success or
- * failure) blocks can be accessed to notify respective listeners, if any.
+ * Blocks movements status handler, which can be used to collect details of the
+ * completed block movements.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class BlocksMovementsStatusHandler {
- private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
-
- /**
- * Collect all the storage movement attempt finished blocks. Later this will
- * be send to namenode via heart beat.
- *
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks
- */
- public void handle(
- List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
- List<Block> blocks = new ArrayList<>();
-
- for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
- blocks.add(item.getBlock());
- }
- // Adding to the tracking report list. Later this can be accessed to know
- // the attempted block movements.
- synchronized (blockIdVsMovementStatus) {
- blockIdVsMovementStatus.addAll(blocks);
- }
- }
+public interface BlocksMovementsStatusHandler {
/**
- * @return unmodifiable list of storage movement attempt finished blocks.
- */
- public List<Block> getMoveAttemptFinishedBlocks() {
- List<Block> moveAttemptFinishedBlks = new ArrayList<>();
- // 1. Adding all the completed block ids.
- synchronized (blockIdVsMovementStatus) {
- if (blockIdVsMovementStatus.size() > 0) {
- moveAttemptFinishedBlks = Collections
- .unmodifiableList(blockIdVsMovementStatus);
- }
- }
- return moveAttemptFinishedBlks;
- }
-
- /**
- * Remove the storage movement attempt finished blocks from the tracking list.
+ * Collect all the storage movement attempt finished blocks.
*
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks
- */
- public void remove(List<Block> moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks != null) {
- blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
- }
- }
-
- /**
- * Clear the blockID vs movement status tracking map.
+ * @param moveAttemptFinishedBlk
+ * storage movement attempt finished block
*/
- public void removeAll() {
- synchronized (blockIdVsMovementStatus) {
- blockIdVsMovementStatus.clear();
- }
- }
+ void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index b7beda4..dab8ae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -514,12 +514,6 @@ class BPServiceActor implements Runnable {
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
- // Get the blocks storage move attempt finished blocks
- List<Block> results = dn.getStoragePolicySatisfyWorker()
- .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
- BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
- getStorageMoveAttemptFinishedBlocks(results);
-
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
@@ -530,19 +524,13 @@ class BPServiceActor implements Runnable {
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
- slowDisks,
- storageMoveAttemptFinishedBlks);
+ slowDisks);
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextOutlierReport();
}
- // Remove the blocks movement results after successfully transferring
- // to namenode.
- dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
- .remove(results);
-
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 76d2efb..d584477 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1424,7 +1424,7 @@ public class DataNode extends ReconfigurableBase
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
storagePolicySatisfyWorker =
- new StoragePolicySatisfyWorker(getConf(), this);
+ new StoragePolicySatisfyWorker(getConf(), this, null);
storagePolicySatisfyWorker.start();
blockPoolManager = new BlockPoolManager(this);
@@ -2134,11 +2134,6 @@ public class DataNode extends ReconfigurableBase
notifyAll();
}
tracer.close();
-
- // Waiting to finish SPS worker thread.
- if (storagePolicySatisfyWorker != null) {
- storagePolicySatisfyWorker.waitToFinishWorkerThread();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index af6137c..0157205 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -38,19 +37,17 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
* These commands would be issued from NameNode as part of Datanode's heart beat
@@ -67,19 +64,19 @@ public class StoragePolicySatisfyWorker {
private final int moverThreads;
private final ExecutorService moveExecutor;
- private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
- private final BlocksMovementsStatusHandler handler;
+ private final CompletionService<BlockMovementAttemptFinished>
+ moverCompletionService;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
private final BlockDispatcher blkDispatcher;
- public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
+ public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode,
+ BlocksMovementsStatusHandler handler) {
this.datanode = datanode;
- // Defaulting to 10. This is to minimise the number of move ops.
+ // Defaulting to 10. This is to minimize the number of move ops.
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
- handler = new BlocksMovementsStatusHandler();
movementTracker = new BlockStorageMovementTracker(moverCompletionService,
handler);
movementTrackerThread = new Daemon(movementTracker);
@@ -88,7 +85,6 @@ public class StoragePolicySatisfyWorker {
int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
ioFileBufferSize, dnConf.getConnectToDnViaHostname());
- // TODO: Needs to manage the number of concurrent moves per DataNode.
}
/**
@@ -100,22 +96,17 @@ public class StoragePolicySatisfyWorker {
}
/**
- * Stop StoragePolicySatisfyWorker, which will stop block movement tracker
- * thread.
+ * Stop StoragePolicySatisfyWorker, which will terminate executor service and
+ * stop block movement tracker thread.
*/
void stop() {
movementTracker.stopTracking();
movementTrackerThread.interrupt();
- }
-
- /**
- * Timed wait to stop BlockStorageMovement tracker daemon thread.
- */
- void waitToFinishWorkerThread() {
+ moveExecutor.shutdown();
try {
- movementTrackerThread.join(3000);
- } catch (InterruptedException ignore) {
- // ignore
+ moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for mover thread to terminate", e);
}
}
@@ -160,10 +151,7 @@ public class StoragePolicySatisfyWorker {
: "Source and Target storage type shouldn't be same!";
BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
blkMovingInfo);
- Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
- .submit(blockMovingTask);
- movementTracker.addBlock(blkMovingInfo.getBlock(),
- moveCallable);
+ moverCompletionService.submit(blockMovingTask);
}
}
@@ -185,7 +173,8 @@ public class StoragePolicySatisfyWorker {
public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
- blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
+ blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+ blkMovingInfo.getTargetStorageType(), status);
}
private BlockMovementStatus moveBlock() {
@@ -217,11 +206,6 @@ public class StoragePolicySatisfyWorker {
}
}
- @VisibleForTesting
- BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() {
- return handler;
- }
-
/**
* Drop the in-progress SPS work queues.
*/
@@ -229,7 +213,5 @@ public class StoragePolicySatisfyWorker {
LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
+ "So, none of the SPS Worker queued block movements will"
+ " be scheduled.");
- movementTracker.removeAll();
- handler.removeAll();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index 45d6218..3f873d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import com.google.common.collect.Lists;
@@ -102,7 +103,11 @@ final class FSDirSatisfyStoragePolicyOp {
// Adding directory in the pending queue, so FileInodeIdCollector
// process directory child in batch and recursively
- fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+ StoragePolicySatisfyManager spsManager =
+ fsd.getBlockManager().getSPSManager();
+ if (spsManager != null) {
+ spsManager.addPathId(inode.getId());
+ }
}
} finally {
fsd.writeUnlock();
@@ -116,7 +121,11 @@ final class FSDirSatisfyStoragePolicyOp {
} else {
// Adding directory in the pending queue, so FileInodeIdCollector process
// directory child in batch and recursively
- fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+ StoragePolicySatisfyManager spsManager =
+ fsd.getBlockManager().getSPSManager();
+ if (spsManager != null) {
+ spsManager.addPathId(inode.getId());
+ }
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 1150a72..3b68979 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.security.AccessControlException;
import java.io.FileNotFoundException;
@@ -209,8 +210,11 @@ class FSDirXAttrOp {
for (XAttr xattr : toRemove) {
if (XATTR_SATISFY_STORAGE_POLICY
.equals(XAttrHelper.getPrefixedName(xattr))) {
- fsd.getBlockManager().getSPSManager().getInternalSPSService()
- .clearQueue(inode.getId());
+ StoragePolicySatisfyManager spsManager =
+ fsd.getBlockManager().getSPSManager();
+ if (spsManager != null) {
+ spsManager.getInternalSPSService().clearQueue(inode.getId());
+ }
break;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 6539b51..2a976d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -1401,7 +1402,9 @@ public class FSDirectory implements Closeable {
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
- if (namesystem.getBlockManager().getSPSManager().isEnabled()) {
+ StoragePolicySatisfyManager spsManager =
+ namesystem.getBlockManager().getSPSManager();
+ if (spsManager != null && spsManager.isEnabled()) {
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2d399e7..f9dcb33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -259,7 +259,6 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
@@ -268,7 +267,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1288,7 +1286,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
- blockManager.getSPSManager().start();
+ if (blockManager.getSPSManager() != null) {
+ blockManager.getSPSManager().start();
+ }
} finally {
startingActiveService = false;
blockManager.checkSafeMode();
@@ -1318,7 +1318,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LOG.info("Stopping services started for active state");
writeLock();
try {
- if (blockManager != null) {
+ if (blockManager != null && blockManager.getSPSManager() != null) {
blockManager.getSPSManager().stop();
}
stopSecretManager();
@@ -1359,7 +1359,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
- blockManager.getSPSManager().stopGracefully();
+ if (blockManager.getSPSManager() != null) {
+ blockManager.getSPSManager().stopGracefully();
+ }
}
} finally {
writeUnlock("stopActiveServices");
@@ -2268,7 +2270,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
// checks sps status
- if (!blockManager.getSPSManager().isEnabled() || (blockManager
+ boolean disabled = (blockManager.getSPSManager() == null);
+ if (disabled || (blockManager
.getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
&& !blockManager.getSPSManager().isInternalSatisfierRunning())) {
throw new UnsupportedActionException(
@@ -3966,8 +3969,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
- @Nonnull SlowDiskReports slowDisks,
- BlocksStorageMoveAttemptFinished blksMovementsFinished)
+ @Nonnull SlowDiskReports slowDisks)
throws IOException {
readLock();
try {
@@ -3983,18 +3985,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
- // Handle blocks movement results sent by the coordinator datanode.
- SPSService sps = blockManager.getSPSManager().getInternalSPSService();
- if (!sps.isRunning()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Storage policy satisfier is not running. So, ignoring storage"
- + " movement attempt finished block info sent by DN");
- }
- } else {
- sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
- }
-
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index b199c72..2087d90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -2148,7 +2148,24 @@ public class NameNode extends ReconfigurableBase implements
}
StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
.fromString(newVal);
- namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+ if (mode == StoragePolicySatisfierMode.NONE) {
+ // disabling sps service
+ if (namesystem.getBlockManager().getSPSManager() != null) {
+ namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+ namesystem.getBlockManager().disableSPS();
+ }
+ } else {
+ // enabling sps service
+ boolean spsCreated = (namesystem.getBlockManager()
+ .getSPSManager() != null);
+ if (!spsCreated) {
+ spsCreated = namesystem.getBlockManager().createSPSManager(getConf(),
+ newVal);
+ }
+ if (spsCreated) {
+ namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+ }
+ }
return newVal;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1590423..57e827d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -156,8 +156,8 @@ import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1517,16 +1517,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
- @Nonnull SlowDiskReports slowDisks,
- BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+ @Nonnull SlowDiskReports slowDisks)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
- slowPeers, slowDisks,
- storageMovementFinishedBlks);
+ slowPeers, slowDisks);
}
@Override // DatanodeProtocol
@@ -2543,10 +2541,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
- boolean isSPSRunning = namesystem.getBlockManager().getSPSManager()
- .isInternalSatisfierRunning();
+ StoragePolicySatisfyManager spsMgr =
+ namesystem.getBlockManager().getSPSManager();
+ boolean isInternalSatisfierRunning = (spsMgr != null
+ ? spsMgr.isInternalSatisfierRunning() : false);
namesystem.logAuditEvent(true, operationName, null);
- return isSPSRunning;
+ return isInternalSatisfierRunning;
}
@Override
@@ -2556,6 +2556,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
+ if (namesystem.getBlockManager().getSPSManager() == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Satisfier is not running inside namenode, so status "
+ + "can't be returned.");
+ }
+ throw new IOException("Satisfier is not running inside namenode, "
+ + "so status can't be returned.");
+ }
return namesystem.getBlockManager().getSPSManager()
.checkStoragePolicySatisfyPathStatus(path);
}
@@ -2568,16 +2576,20 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
- // Check that SPS daemon service is running inside namenode
- if (namesystem.getBlockManager().getSPSManager()
- .getMode() == StoragePolicySatisfierMode.INTERNAL) {
- LOG.debug("SPS service is internally enabled and running inside "
- + "namenode, so external SPS is not allowed to fetch the path Ids");
- throw new IOException("SPS service is internally enabled and running"
- + " inside namenode, so external SPS is not allowed to fetch"
- + " the path Ids");
+ // Check that SPS is enabled externally
+ StoragePolicySatisfyManager spsMgr =
+ namesystem.getBlockManager().getSPSManager();
+ StoragePolicySatisfierMode spsMode = (spsMgr != null ? spsMgr.getMode()
+ : StoragePolicySatisfierMode.NONE);
+ if (spsMode != StoragePolicySatisfierMode.EXTERNAL) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SPS service mode is {}, so external SPS service is "
+ + "not allowed to fetch the path Ids", spsMode);
+ }
+ throw new IOException("SPS service mode is " + spsMode + ", so "
+ + "external SPS service is not allowed to fetch the path Ids");
}
- Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
+ Long pathId = spsMgr.getNextPathId();
if (pathId == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index d2f0bb2..5b25491 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -17,21 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
-
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,10 +67,13 @@ public class BlockStorageMovementAttemptedItems<T> {
* processing and sent to DNs.
*/
private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
- private final List<Block> movementFinishedBlocks;
+ private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
+ // Maintains separate Queue to keep the movement finished blocks. This Q
+ // is used to update the storageMovementAttemptedItems list asynchronously.
+ private final BlockingQueue<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
- private final BlockMovementListener blkMovementListener;
+ private BlockMovementListener blkMovementListener;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
@@ -94,7 +104,8 @@ public class BlockStorageMovementAttemptedItems<T> {
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new ArrayList<>();
- movementFinishedBlocks = new ArrayList<>();
+ scheduledBlkLocs = new HashMap<>();
+ movementFinishedBlocks = new LinkedBlockingQueue<>();
this.blkMovementListener = blockMovementListener;
}
@@ -105,29 +116,67 @@ public class BlockStorageMovementAttemptedItems<T> {
* @param itemInfo
* - tracking info
*/
- public void add(AttemptedItemInfo<T> itemInfo) {
+ public void add(T startPath, T file, long monotonicNow,
+ Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
+ AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file,
+ monotonicNow, assignedBlocks.keySet(), retryCount);
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.add(itemInfo);
}
+ synchronized (scheduledBlkLocs) {
+ scheduledBlkLocs.putAll(assignedBlocks);
+ }
}
/**
- * Add the storage movement attempt finished blocks to
- * storageMovementFinishedBlocks.
+ * Notify the storage movement attempt finished block.
*
- * @param moveAttemptFinishedBlks
- * storage movement attempt finished blocks
+ * @param reportedDn
+ * reported datanode
+ * @param type
+ * storage type
+ * @param reportedBlock
+ * reported block
*/
- public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks.length == 0) {
- return;
+ public void notifyReportedBlock(DatanodeInfo reportedDn, StorageType type,
+ Block reportedBlock) {
+ synchronized (scheduledBlkLocs) {
+ if (scheduledBlkLocs.size() <= 0) {
+ return;
+ }
+ matchesReportedBlock(reportedDn, type, reportedBlock);
}
- synchronized (movementFinishedBlocks) {
- movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
+ }
+
+ private void matchesReportedBlock(DatanodeInfo reportedDn, StorageType type,
+ Block reportedBlock) {
+ Set<StorageTypeNodePair> blkLocs = scheduledBlkLocs.get(reportedBlock);
+ if (blkLocs == null) {
+ return; // unknown block, simply skip.
}
- // External listener if it is plugged-in
- if (blkMovementListener != null) {
- blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+
+ for (StorageTypeNodePair dn : blkLocs) {
+ boolean foundDn = dn.getDatanodeInfo().compareTo(reportedDn) == 0 ? true
+ : false;
+ boolean foundType = dn.getStorageType().equals(type);
+ if (foundDn && foundType) {
+ blkLocs.remove(dn);
+ // listener if it is plugged-in
+ if (blkMovementListener != null) {
+ blkMovementListener
+ .notifyMovementTriedBlocks(new Block[] {reportedBlock});
+ }
+ // All the block locations has reported.
+ if (blkLocs.size() <= 0) {
+ movementFinishedBlocks.add(reportedBlock);
+ scheduledBlkLocs.remove(reportedBlock); // clean-up reported block
+ }
+ return; // found
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reported block:{} not found in attempted blocks. Datanode:{}"
+ + ", StorageType:{}", reportedBlock, reportedDn, type);
}
}
@@ -203,14 +252,12 @@ public class BlockStorageMovementAttemptedItems<T> {
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
T file = itemInfo.getFile();
- synchronized (movementFinishedBlocks) {
- ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
- file, itemInfo.getRetryCount() + 1);
- blockStorageMovementNeeded.add(candidate);
- iter.remove();
- LOG.info("TrackID: {} becomes timed out and moved to needed "
- + "retries queue for next iteration.", file);
- }
+ ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file,
+ itemInfo.getRetryCount() + 1);
+ blockStorageMovementNeeded.add(candidate);
+ iter.remove();
+ LOG.info("TrackID: {} becomes timed out and moved to needed "
+ + "retries queue for next iteration.", file);
}
}
}
@@ -218,29 +265,25 @@ public class BlockStorageMovementAttemptedItems<T> {
@VisibleForTesting
void blockStorageMovementReportedItemsCheck() throws IOException {
- synchronized (movementFinishedBlocks) {
- Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
- while (finishedBlksIter.hasNext()) {
- Block blk = finishedBlksIter.next();
- synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo<T>> iterator =
- storageMovementAttemptedItems.iterator();
- while (iterator.hasNext()) {
- AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
- attemptedItemInfo.getBlocks().remove(blk);
- if (attemptedItemInfo.getBlocks().isEmpty()) {
- // TODO: try add this at front of the Queue, so that this element
- // gets the chance first and can be cleaned from queue quickly as
- // all movements already done.
- blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
- .getStartPath(), attemptedItemInfo.getFile(),
- attemptedItemInfo.getRetryCount() + 1));
- iterator.remove();
- }
+ // Removes all available blocks from this queue and process it.
+ Collection<Block> finishedBlks = new ArrayList<>();
+ movementFinishedBlocks.drainTo(finishedBlks);
+
+ // Update attempted items list
+ for (Block blk : finishedBlks) {
+ synchronized (storageMovementAttemptedItems) {
+ Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems
+ .iterator();
+ while (iterator.hasNext()) {
+ AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
+ attemptedItemInfo.getBlocks().remove(blk);
+ if (attemptedItemInfo.getBlocks().isEmpty()) {
+ blockStorageMovementNeeded.add(new ItemInfo<T>(
+ attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
+ attemptedItemInfo.getRetryCount() + 1));
+ iterator.remove();
}
}
- // Remove attempted blocks from movementFinishedBlocks list.
- finishedBlksIter.remove();
}
}
}
@@ -252,15 +295,29 @@ public class BlockStorageMovementAttemptedItems<T> {
@VisibleForTesting
public int getAttemptedItemsCount() {
- return storageMovementAttemptedItems.size();
+ synchronized (storageMovementAttemptedItems) {
+ return storageMovementAttemptedItems.size();
+ }
}
public void clearQueues() {
- synchronized (movementFinishedBlocks) {
- movementFinishedBlocks.clear();
- }
+ movementFinishedBlocks.clear();
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.clear();
}
+ synchronized (scheduledBlkLocs) {
+ scheduledBlkLocs.clear();
+ }
+ }
+
+ /**
+ * Sets external listener for testing.
+ *
+ * @param blkMoveListener
+ * block movement listener callback object
+ */
+ @VisibleForTesting
+ void setBlockMovementListener(BlockMovementListener blkMoveListener) {
+ this.blkMovementListener = blkMoveListener;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 71d8fd1..5032377 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -22,8 +22,10 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
/**
* An interface for SPSService, which exposes life cycle and processing APIs.
@@ -131,11 +133,16 @@ public interface SPSService<T> {
void markScanCompletedForPath(T spsPath);
/**
- * Notify the details of storage movement attempt finished blocks.
+ * Given node is reporting that it received a certain movement attempt
+ * finished block.
*
- * @param moveAttemptFinishedBlks
- * - array contains all the blocks that are attempted to move
+ * @param dnInfo
+ * - reported datanode
+ * @param storageType
+ * - storage type
+ * @param block
+ * - block that is attempted to move
*/
- void notifyStorageMovementAttemptFinishedBlks(
- BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
+ void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+ StorageType storageType, Block block);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 1c7a580..cbd6001 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -24,9 +24,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,7 +53,6 @@ import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
@@ -83,8 +85,6 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
private BlockStorageMovementNeeded<T> storageMovementNeeded;
private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
private volatile boolean isRunning = false;
- private volatile StoragePolicySatisfierMode spsMode =
- StoragePolicySatisfierMode.NONE;
private int spsWorkMultiplier;
private long blockCount = 0L;
private int blockMovementMaxRetry;
@@ -128,11 +128,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
private Status status = null;
- private List<Block> assignedBlocks = null;
+ private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null;
- BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+ BlocksMovingAnalysis(Status status,
+ Map<Block, Set<StorageTypeNodePair>> assignedBlocks) {
this.status = status;
- this.assignedBlocks = blockMovingInfo;
+ this.assignedBlocks = assignedBlocks;
}
}
@@ -164,7 +165,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
serviceMode);
return;
}
- if (spsMode == StoragePolicySatisfierMode.INTERNAL
+ if (serviceMode == StoragePolicySatisfierMode.INTERNAL
&& ctxt.isMoverRunning()) {
isRunning = false;
LOG.error(
@@ -175,14 +176,13 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
if (reconfigStart) {
LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
- + "start it.", StringUtils.toLowerCase(spsMode.toString()));
+ + "start it.", StringUtils.toLowerCase(serviceMode.toString()));
} else {
LOG.info("Starting {} StoragePolicySatisfier.",
- StringUtils.toLowerCase(spsMode.toString()));
+ StringUtils.toLowerCase(serviceMode.toString()));
}
isRunning = true;
- this.spsMode = serviceMode;
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
@@ -297,36 +297,36 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
- LOG.debug("Block analysis status:{} for the file path:{}."
+ LOG.debug("Block analysis status:{} for the file id:{}."
+ " Adding to attempt monitor queue for the storage "
+ "movement attempt finished report",
- status.status, fileStatus.getPath());
+ status.status, fileStatus.getFileId());
}
- this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
- itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(),
- status.assignedBlocks, itemInfo.getRetryCount()));
+ this.storageMovementsMonitor.add(itemInfo.getStartPath(),
+ itemInfo.getFile(), monotonicNow(), status.assignedBlocks,
+ itemInfo.getRetryCount());
break;
case NO_BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID:{} for the file path:{} back to"
+ LOG.debug("Adding trackID:{} for the file id:{} back to"
+ " retry queue as none of the blocks found its eligible"
- + " targets.", trackId, fileStatus.getPath());
+ + " targets.", trackId, fileStatus.getFileId());
}
retryItem = true;
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID:{} for the file path:{} back to "
+ LOG.debug("Adding trackID:{} for the file id:{} back to "
+ "retry queue as some of the blocks are low redundant.",
- trackId, fileStatus.getPath());
+ trackId, fileStatus.getFileId());
}
retryItem = true;
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID:{} for the file path:{} back to "
+ LOG.debug("Adding trackID:{} for the file id:{} back to "
+ "retry queue as some of the blocks movement failed.",
- trackId, fileStatus.getPath());
+ trackId, fileStatus.getFileId());
}
retryItem = true;
break;
@@ -334,9 +334,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
default:
- LOG.info("Block analysis status:{} for the file path:{}."
+ LOG.info("Block analysis status:{} for the file id:{}."
+ " So, Cleaning up the Xattrs.", status.status,
- fileStatus.getPath());
+ fileStatus.getFileId());
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
break;
}
@@ -389,19 +389,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
if (!lastBlkComplete) {
// Postpone, currently file is under construction
LOG.info("File: {} is under construction. So, postpone"
- + " this to the next retry iteration", fileInfo.getPath());
+ + " this to the next retry iteration", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
- new ArrayList<>());
+ new HashMap<>());
}
List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
if (blocks.size() == 0) {
LOG.info("File: {} is not having any blocks."
- + " So, skipping the analysis.", fileInfo.getPath());
+ + " So, skipping the analysis.", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
- new ArrayList<>());
+ new HashMap<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
boolean hasLowRedundancyBlocks = false;
@@ -432,7 +432,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
+ "So, ignoring to move the blocks");
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
- new ArrayList<>());
+ new HashMap<>());
}
} else {
expectedStorageTypes = existingStoragePolicy
@@ -465,13 +465,21 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
&& status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
- List<Block> assignedBlockIds = new ArrayList<Block>();
+ Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
- assignedBlockIds.add(blkMovingInfo.getBlock());
+ StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
+ blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
+ Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks
+ .get(blkMovingInfo.getBlock());
+ if (nodesWithStorage == null) {
+ nodesWithStorage = new HashSet<>();
+ assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage);
+ }
+ nodesWithStorage.add(nodeStorage);
blockCount++;
} catch (IOException e) {
LOG.warn("Exception while scheduling movement task", e);
@@ -479,7 +487,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
}
}
- return new BlocksMovingAnalysis(status, assignedBlockIds);
+ return new BlocksMovingAnalysis(status, assignedBlocks);
}
/**
@@ -545,6 +553,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
new ArrayList<StorageTypeNodePair>();
List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
Arrays.asList(storages));
+
+ // Add existing storages into exclude nodes to avoid choosing this as
+ // remote target later.
+ List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages);
+
// if expected type exists in source node already, local movement would be
// possible, so lets find such sources first.
Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
@@ -582,7 +595,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, sourceWithStorageMap,
expectedStorageTypes, targetDns,
- ecPolicy);
+ ecPolicy, excludeNodes);
}
return foundMatchingTargetNodesForBlock;
}
@@ -601,6 +614,10 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* - Expecting storages to move
* @param targetDns
* - Available DNs for expected storage types
+ * @param ecPolicy
+ * - erasure coding policy of sps invoked file
+ * @param excludeNodes
+ * - existing source nodes, which has replica copy
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy
*/
@@ -609,9 +626,8 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expectedTypes,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
- ErasureCodingPolicy ecPolicy) {
+ ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) {
boolean foundMatchingTargetNodesForBlock = true;
- List<DatanodeInfo> excludeNodes = new ArrayList<>();
// Looping over all the source node locations and choose the target
// storage within same node if possible. This is done separately to
@@ -638,10 +654,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
expectedTypes.remove(chosenTarget.storageType);
}
}
- // To avoid choosing this excludeNodes as targets later
- excludeNodes.add(existingTypeNodePair.dn);
}
-
+ // If all the sources and targets are paired within same node, then simply
+ // return.
+ if (expectedTypes.size() <= 0) {
+ return foundMatchingTargetNodesForBlock;
+ }
// Looping over all the source node locations. Choose a remote target
// storage node if it was not found out within same node.
for (int i = 0; i < sourceWithStorageList.size(); i++) {
@@ -824,14 +842,29 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
/**
* Keeps datanode with its respective storage type.
*/
- private static final class StorageTypeNodePair {
+ static final class StorageTypeNodePair {
private final StorageType storageType;
private final DatanodeInfo dn;
- private StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
+ StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
this.storageType = storageType;
this.dn = dn;
}
+
+ public DatanodeInfo getDatanodeInfo() {
+ return dn;
+ }
+
+ public StorageType getStorageType() {
+ return storageType;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("StorageTypeNodePair(\n ")
+ .append("DatanodeInfo: ").append(dn).append(", StorageType: ")
+ .append(storageType).toString();
+ }
}
private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
@@ -1043,18 +1076,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
/**
- * Receives set of storage movement attempt finished blocks report.
+ * Receives storage movement attempt finished block report.
*
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks.
+ * @param dnInfo
+ * reported datanode
+ * @param storageType
+ * - storage type
+ * @param block
+ * movement attempt finished block.
*/
- public void notifyStorageMovementAttemptFinishedBlks(
- BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
- return;
- }
- storageMovementsMonitor
- .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks());
+ @Override
+ public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+ StorageType storageType, Block block) {
+ storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block);
}
@VisibleForTesting
@@ -1086,7 +1120,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
*/
final static class AttemptedItemInfo<T> extends ItemInfo<T> {
private long lastAttemptedOrReportedTime;
- private final List<Block> blocks;
+ private final Set<Block> blocks;
/**
* AttemptedItemInfo constructor.
@@ -1097,10 +1131,14 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* trackId for file.
* @param lastAttemptedOrReportedTime
* last attempted or reported time
+ * @param blocks
+ * scheduled blocks
+ * @param retryCount
+ * file retry count
*/
AttemptedItemInfo(T rootId, T trackId,
long lastAttemptedOrReportedTime,
- List<Block> blocks, int retryCount) {
+ Set<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.blocks = blocks;
@@ -1121,10 +1159,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
this.lastAttemptedOrReportedTime = monotonicNow();
}
- List<Block> getBlocks() {
+ Set<Block> getBlocks() {
return this.blocks;
}
-
}
/**
@@ -1241,4 +1278,15 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
+
+ /**
+ * Sets external listener for testing.
+ *
+ * @param blkMovementListener
+ * block movement listener callback object
+ */
+ @VisibleForTesting
+ void setBlockMovementListener(BlockMovementListener blkMovementListener) {
+ storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e5b64f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index fcc2df1..311b68f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -112,7 +112,6 @@ public interface DatanodeProtocol {
* @param slowPeers Details of peer DataNodes that were detected as being
* slow to respond to packet writes. Empty report if no
* slow peers were detected by the DataNode.
- * @param storageMovFinishedBlks array of movement attempt finished blocks
* @throws IOException on error
*/
@Idempotent
@@ -126,8 +125,7 @@ public interface DatanodeProtocol {
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
- @Nonnull SlowDiskReports slowDisks,
- BlocksStorageMoveAttemptFinished storageMovFinishedBlks)
+ @Nonnull SlowDiskReports slowDisks)
throws IOException;
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org