You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:34 UTC
[41/50] [abbrv] hadoop git commit: HDFS-13165: [SPS]: Collects
successfully moved block details via IBR. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 7580ba9..f5225d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.KeyManager;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
ioFileBufferSize, connectToDnViaHostname);
+
+ startMovementTracker();
}
/**
* Initializes block movement tracker daemon and starts the thread.
*/
- public void init() {
+ private void startMovementTracker() {
movementTrackerThread = new Daemon(this.blkMovementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
- Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
- .submit(blockMovingTask);
- blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+ mCompletionServ.submit(blockMovingTask);
}
private class ExternalBlocksMovementsStatusHandler
- extends BlocksMovementsStatusHandler {
+ implements BlocksMovementsStatusHandler {
@Override
- public void handle(
- List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
- List<Block> blocks = new ArrayList<>();
- for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
- blocks.add(item.getBlock());
- }
- BlocksStorageMoveAttemptFinished blkAttempted =
- new BlocksStorageMoveAttemptFinished(
- blocks.toArray(new Block[blocks.size()]));
- service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+ public void handle(BlockMovementAttemptFinished attemptedMove) {
+ service.notifyStorageMovementAttemptFinishedBlk(
+ attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+ attemptedMove.getBlock());
}
}
@@ -194,6 +183,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
BlockMovementStatus blkMovementStatus = moveBlock();
return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+ blkMovingInfo.getTargetStorageType(),
blkMovementStatus);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 6fc35ea..236b887 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -86,7 +86,6 @@ public final class ExternalStoragePolicySatisfier {
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
- externalHandler.init();
sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
blkMoveListener);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
@@ -147,7 +146,7 @@ public final class ExternalStoragePolicySatisfier {
for (Block block : moveAttemptFinishedBlks) {
actualBlockMovements.add(block);
}
- LOG.info("Movement attempted blocks", actualBlockMovements);
+ LOG.info("Movement attempted blocks:{}", actualBlockMovements);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7c35494..baf7ec7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -185,14 +185,6 @@ message BlockMovingInfoProto {
}
/**
- * Blocks for which storage movements has been attempted and finished
- * with either success or failure.
- */
-message BlocksStorageMoveAttemptFinishedProto {
- repeated BlockProto blocks = 1;
-}
-
-/**
* registration - Information of the datanode registering with the namenode
*/
message RegisterDatanodeRequestProto {
@@ -249,7 +241,6 @@ message HeartbeatRequestProto {
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
repeated SlowDiskReportProto slowDisks = 11;
- optional BlocksStorageMoveAttemptFinishedProto storageMoveAttemptFinishedBlks = 12;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0f80f97..0b533c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4592,6 +4592,47 @@
</property>
<property>
+ <name>dfs.storage.policy.satisfier.max.outstanding.paths</name>
+ <value>10000</value>
+ <description>
+ Defines the maximum number of paths to satisfy that can be queued up in the
+ Satisfier call queue in a period of time. Default value is 10000.
+ </description>
+</property>
+
+<property>
+ <name>dfs.storage.policy.satisfier.address</name>
+ <value>0.0.0.0:0</value>
+ <description>
+ The hostname used for a keytab based Kerberos login. Keytab based login
+ is required when dfs.storage.policy.satisfier.mode is external.
+ </description>
+</property>
+
+<property>
+ <name>dfs.storage.policy.satisfier.keytab.file</name>
+ <value></value>
+ <description>
+ The keytab file used by external StoragePolicySatisfier to login as its
+ service principal. The principal name is configured with
+ dfs.storage.policy.satisfier.kerberos.principal. Keytab based login
+ is required when dfs.storage.policy.satisfier.mode is external.
+ </description>
+</property>
+
+<property>
+ <name>dfs.storage.policy.satisfier.kerberos.principal</name>
+ <value></value>
+ <description>
+ The StoragePolicySatisfier principal. This is typically set to
+ satisfier/_HOST@REALM.TLD. The StoragePolicySatisfier will substitute
+ _HOST with its own fully qualified hostname at startup. The _HOST placeholder
+ allows using the same configuration setting on different servers. Keytab
+ based login is required when dfs.storage.policy.satisfier.mode is external.
+ </description>
+</property>
+
+<property>
<name>dfs.pipeline.ecn</name>
<value>false</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index f247370..05b6d30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -117,8 +116,7 @@ public class TestNameNodePrunesMissingStorages {
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null, true, SlowPeerReports.EMPTY_REPORT,
- SlowDiskReports.EMPTY_REPORT,
- new BlocksStorageMoveAttemptFinished(null));
+ SlowDiskReports.EMPTY_REPORT);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index d13d717..b453991 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -168,8 +167,7 @@ public class InternalDataNodeTestUtils {
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
- Mockito.any(SlowDiskReports.class),
- Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
+ Mockito.any(SlowDiskReports.class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
new file mode 100644
index 0000000..b361ce5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+
+/**
+ * Blocks movements status handler, which is used to collect details of the
+ * completed block movements and later these attempted finished(with success or
+ * failure) blocks can be accessed to notify respective listeners, if any.
+ */
+public class SimpleBlocksMovementsStatusHandler
+ implements BlocksMovementsStatusHandler {
+ private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
+
+ /**
+ * Collect all the storage movement attempt finished blocks. Later this will
+ * be send to namenode via heart beat.
+ *
+ * @param moveAttemptFinishedBlk
+ * storage movement attempt finished block
+ */
+ public void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk) {
+ // Adding to the tracking report list. Later this can be accessed to know
+ // the attempted block movements.
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.add(moveAttemptFinishedBlk.getBlock());
+ }
+ }
+
+ /**
+ * @return unmodifiable list of storage movement attempt finished blocks.
+ */
+ public List<Block> getMoveAttemptFinishedBlocks() {
+ List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+ // 1. Adding all the completed block ids.
+ synchronized (blockIdVsMovementStatus) {
+ if (blockIdVsMovementStatus.size() > 0) {
+ moveAttemptFinishedBlks = Collections
+ .unmodifiableList(blockIdVsMovementStatus);
+ }
+ }
+ return moveAttemptFinishedBlks;
+ }
+
+ /**
+ * Remove the storage movement attempt finished blocks from the tracking list.
+ *
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks
+ */
+ public void remove(List<Block> moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks != null) {
+ blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
+ }
+ }
+
+ /**
+ * Clear the blockID vs movement status tracking map.
+ */
+ public void removeAll() {
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 0fa1696..d0c3a83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -124,8 +123,8 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
- Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
- .getStoragePolicySatisfyWorker();
+ Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+ .when(mockDn).getStoragePolicySatisfyWorker();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -160,8 +159,7 @@ public class TestBPOfferService {
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
- Mockito.any(SlowDiskReports.class),
- Mockito.any(BlocksStorageMoveAttemptFinished.class));
+ Mockito.any(SlowDiskReports.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;
@@ -380,8 +378,8 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
- Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
- .getStoragePolicySatisfyWorker();
+ Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+ .when(mockDn).getStoragePolicySatisfyWorker();
final AtomicInteger count = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 052eb87..07fd4ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -93,7 +93,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -233,8 +232,7 @@ public class TestBlockRecovery {
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
- Mockito.any(SlowDiskReports.class),
- Mockito.any(BlocksStorageMoveAttemptFinished.class)))
+ Mockito.any(SlowDiskReports.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 0dd15c3..28427bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -173,8 +172,7 @@ public class TestDataNodeLifeline {
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
- any(SlowDiskReports.class),
- any(BlocksStorageMoveAttemptFinished.class));
+ any(SlowDiskReports.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -239,8 +237,7 @@ public class TestDataNodeLifeline {
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
- any(SlowDiskReports.class),
- any(BlocksStorageMoveAttemptFinished.class));
+ any(SlowDiskReports.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d47da69..bb1d9ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -223,8 +222,7 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
- Mockito.any(SlowDiskReports.class),
- Mockito.any(BlocksStorageMoveAttemptFinished.class));
+ Mockito.any(SlowDiskReports.class));
dn = new DataNode(conf, locations, null, null) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 3732b2e..2dbd5b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -209,8 +208,7 @@ public class TestFsDatasetCache {
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
- any(SlowDiskReports.class),
- any(BlocksStorageMoveAttemptFinished.class));
+ any(SlowDiskReports.class));
} finally {
lock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 06a66f7..51d3254 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -35,8 +33,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
@@ -173,8 +171,10 @@ public class TestStoragePolicySatisfyWorker {
DatanodeInfo targetDnInfo = DFSTestUtil
.getLocalDatanodeInfo(src.getXferPort());
+ SimpleBlocksMovementsStatusHandler handler =
+ new SimpleBlocksMovementsStatusHandler();
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
- src);
+ src, handler);
try {
worker.start();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
@@ -184,81 +184,19 @@ public class TestStoragePolicySatisfyWorker {
blockMovingInfos.add(blockMovingInfo);
worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
blockMovingInfos);
-
- waitForBlockMovementCompletion(worker, 1, 30000);
+ waitForBlockMovementCompletion(handler, 1, 30000);
} finally {
worker.stop();
}
}
- /**
- * Tests that drop SPS work method clears all the queues.
- *
- * @throws Exception
- */
- @Test(timeout = 120000)
- public void testDropSPSWork() throws Exception {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
-
- cluster.waitActive();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- final String file = "/testDropSPSWork";
- DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
- DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
-
- // move to ARCHIVE
- dfs.setStoragePolicy(new Path(file), "COLD");
-
- DataNode src = cluster.getDataNodes().get(2);
- DatanodeInfo targetDnInfo =
- DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
-
- StoragePolicySatisfyWorker worker =
- new StoragePolicySatisfyWorker(conf, src);
- worker.start();
- try {
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- List<LocatedBlock> locatedBlocks =
- dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
- for (LocatedBlock locatedBlock : locatedBlocks) {
- BlockMovingInfo blockMovingInfo =
- prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
- locatedBlock.getLocations()[0], targetDnInfo,
- locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
- blockMovingInfos.add(blockMovingInfo);
- }
- worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
- blockMovingInfos);
- // Wait till results queue build up
- waitForBlockMovementResult(worker, 30000);
- worker.dropSPSWork();
- assertTrue(worker.getBlocksMovementsStatusHandler()
- .getMoveAttemptFinishedBlocks().size() == 0);
- } finally {
- worker.stop();
- }
- }
-
- private void waitForBlockMovementResult(
- final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
- .getMoveAttemptFinishedBlocks();
- return completedBlocks.size() > 0;
- }
- }, 100, timeout);
- }
-
private void waitForBlockMovementCompletion(
- final StoragePolicySatisfyWorker worker,
+ final SimpleBlocksMovementsStatusHandler handler,
int expectedFinishedItemsCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
- .getMoveAttemptFinishedBlocks();
+ List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks();
int finishedCount = completedBlocks.size();
LOG.info("Block movement completed count={}, expected={} and actual={}",
completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 20402f2..5f62ddb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -111,8 +110,7 @@ public class TestStorageReport {
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
- Mockito.any(SlowDiskReports.class),
- Mockito.any(BlocksStorageMoveAttemptFinished.class));
+ Mockito.any(SlowDiskReports.class));
StorageReport[] reports = captor.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index ec00ae7..3a3c471 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -957,8 +956,8 @@ public class NNThroughputBenchmark implements Tool {
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
- new BlocksStorageMoveAttemptFinished(null)).getCommands();
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+ .getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@@ -1008,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool {
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
- new BlocksStorageMoveAttemptFinished(null)).getCommands();
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+ .getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 899bb82..b85527a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -131,8 +130,7 @@ public class NameNodeAdapter {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
- new BlocksStorageMoveAttemptFinished(null));
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
}
public static boolean setReplication(final FSNamesystem ns,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 65628b9..df74107 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,8 +139,8 @@ public class TestDeadDatanode {
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
- new BlocksStorageMoveAttemptFinished(null)).getCommands();
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+ .getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
index 47ea39f..ee0b2e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
@@ -250,10 +251,9 @@ public class TestNameNodeReconfigure {
StoragePolicySatisfierMode.INTERNAL.toString());
// Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
- assertEquals("SPS shouldn't start as "
- + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false,
- nameNode.getNamesystem().getBlockManager().getSPSManager()
- .isInternalSatisfierRunning());
+ assertNull("SPS shouldn't start as "
+ + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
+ nameNode.getNamesystem().getBlockManager().getSPSManager());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL, false);
@@ -352,9 +352,12 @@ public class TestNameNodeReconfigure {
void verifySPSEnabled(final NameNode nameNode, String property,
StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
- assertEquals(property + " has wrong value", isSatisfierRunning, nameNode
- .getNamesystem().getBlockManager().getSPSManager()
- .isInternalSatisfierRunning());
+ StoragePolicySatisfyManager spsMgr = nameNode
+ .getNamesystem().getBlockManager().getSPSManager();
+ boolean isInternalSatisfierRunning = spsMgr != null
+ ? spsMgr.isInternalSatisfierRunning() : false;
+ assertEquals(property + " has wrong value", isSatisfierRunning,
+ isInternalSatisfierRunning);
String actual = nameNode.getConf().get(property,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
assertEquals(property + " has wrong value", expected,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 29af885..ed1fe92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -22,13 +22,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -92,14 +97,16 @@ public class TestBlockStorageMovementAttemptedItems {
*/
@Test(timeout = 30000)
public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
- bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- List<Block> blocks = new ArrayList<Block>();
- blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
- Block[] blockArray = new Block[blocks.size()];
- blocks.toArray(blockArray);
- bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
+ Block block = new Block(item);
+ DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+ Set<StorageTypeNodePair> locs = new HashSet<>();
+ locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+ Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+ blocksMap.put(block, locs);
+ bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
+ bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+ block);
assertEquals("Failed to receive result!", 1,
bsmAttemptedItems.getMovementFinishedBlocksCount());
}
@@ -111,9 +118,13 @@ public class TestBlockStorageMovementAttemptedItems {
public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement report monitor thread
Long item = new Long(1234);
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
+ Block block = new Block(item);
+ DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+ Set<StorageTypeNodePair> locs = new HashSet<>();
+ locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+ Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+ blocksMap.put(block, locs);
+ bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
assertEquals("Shouldn't receive result", 0,
bsmAttemptedItems.getMovementFinishedBlocksCount());
assertEquals("Item doesn't exist in the attempted list", 1,
@@ -129,15 +140,18 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried1() throws Exception {
Long item = new Long(1234);
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- blocks.add(new Block(5678L));
+ Block block1 = new Block(item);
+ Block block2 = new Block(5678L);
Long trackID = 0L;
- bsmAttemptedItems
- .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+ DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+ Set<StorageTypeNodePair> locs = new HashSet<>();
+ locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+ Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+ blocksMap.put(block1, locs);
+ blocksMap.put(block2, locs);
+ bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+ bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+ block1);
// start block movement report monitor thread
bsmAttemptedItems.start();
@@ -155,14 +169,16 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried2() throws Exception {
Long item = new Long(1234);
+ Block block = new Block(item);
Long trackID = 0L;
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems
- .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+ DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+ Set<StorageTypeNodePair> locs = new HashSet<>();
+ locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+ Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+ blocksMap.put(block, locs);
+ bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+ bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+ block);
Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
@@ -183,14 +199,16 @@ public class TestBlockStorageMovementAttemptedItems {
public void testPartialBlockMovementWithEmptyAttemptedQueue()
throws Exception {
Long item = new Long(1234);
+ Block block = new Block(item);
Long trackID = 0L;
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems
- .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+ DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+ Set<StorageTypeNodePair> locs = new HashSet<>();
+ locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+ Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+ blocksMap.put(block, locs);
+ bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+ bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+ block);
assertFalse(
"Should not add in queue again if it is not there in"
+ " storageMovementAttemptedItems",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 75aeb86..b05717a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -107,6 +108,8 @@ public class TestStoragePolicySatisfier {
public static final long CAPACITY = 2 * 256 * 1024 * 1024;
public static final String FILE = "/testMoveToSatisfyStoragePolicy";
public static final int DEFAULT_BLOCK_SIZE = 1024;
+ private ExternalBlockMovementListener blkMoveListener =
+ new ExternalBlockMovementListener();
/**
* Sets hdfs cluster.
@@ -1029,6 +1032,9 @@ public class TestStoragePolicySatisfier {
config.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
+ config.set(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ "5000");
StorageType[][] newtypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK},
@@ -1072,6 +1078,9 @@ public class TestStoragePolicySatisfier {
config.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
+ config.set(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ "5000");
StorageType[][] newtypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK},
@@ -1089,7 +1098,7 @@ public class TestStoragePolicySatisfier {
fs.setStoragePolicy(filePath, "COLD");
fs.satisfyStoragePolicy(filePath);
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
- StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
+ StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem());
assertFalse("Log output does not contain expected log message: ",
logs.getOutput().contains("some of the blocks are low redundant"));
} finally {
@@ -1425,6 +1434,9 @@ public class TestStoragePolicySatisfier {
config.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
+ config.set(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ "5000");
config.setBoolean(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
false);
@@ -1467,7 +1479,7 @@ public class TestStoragePolicySatisfier {
for (int i = 1; i <= 10; i++) {
Path filePath = new Path("/file" + i);
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
- StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+ StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem());
}
for (int i = 11; i <= 20; i++) {
Path filePath = new Path("/file" + i);
@@ -1725,20 +1737,16 @@ public class TestStoragePolicySatisfier {
public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
- BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier<Long> sps =
- (StoragePolicySatisfier<Long>) blockManager
- .getSPSManager().getInternalSPSService();
+ Assert.assertNotNull("Didn't set external block move listener",
+ blkMoveListener);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
+ int actualCount = blkMoveListener.getActualBlockMovements().size();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
- ((BlockStorageMovementAttemptedItems<Long>) (sps
- .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
- return ((BlockStorageMovementAttemptedItems<Long>) (sps
- .getAttemptedItemsMonitor()))
- .getMovementFinishedBlocksCount()
+ actualCount);
+ return actualCount
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
@@ -1790,11 +1798,54 @@ public class TestStoragePolicySatisfier {
.numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
.storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive();
+
+ // Sets external listener for assertion.
+ blkMoveListener.clear();
+ BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
+ .getSPSManager().getInternalSPSService();
+ sps.setBlockMovementListener(blkMoveListener);
return cluster;
}
public void restartNamenode() throws IOException {
hdfsCluster.restartNameNodes();
hdfsCluster.waitActive();
+ BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+ StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager();
+ if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) {
+ // Sets external listener for assertion.
+ blkMoveListener.clear();
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService();
+ sps.setBlockMovementListener(blkMoveListener);
+ }
+ }
+
+ /**
+ * Implementation of listener callback, where it collects all the sps move
+ * attempted blocks for assertion.
+ */
+ public static final class ExternalBlockMovementListener
+ implements BlockMovementListener {
+
+ private List<Block> actualBlockMovements = new ArrayList<>();
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ for (Block block : moveAttemptFinishedBlks) {
+ actualBlockMovements.add(block);
+ }
+ LOG.info("Movement attempted blocks:{}", actualBlockMovements);
+ }
+
+ public List<Block> getActualBlockMovements() {
+ return actualBlockMovements;
+ }
+
+ public void clear() {
+ actualBlockMovements.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
index e69a833..857bd6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -70,6 +71,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
private int cellSize;
private int defaultStripeBlockSize;
private Configuration conf;
+ private ExternalBlockMovementListener blkMoveListener =
+ new ExternalBlockMovementListener();
private ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
@@ -131,6 +134,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
try {
cluster.waitActive();
+
+ // Sets external listener for assertion.
+ blkMoveListener.clear();
+ BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
+ .getSPSManager().getInternalSPSService();
+ sps.setBlockMovementListener(blkMoveListener);
+
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -240,6 +252,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
try {
cluster.waitActive();
+
+ // Sets external listener for assertion.
+ blkMoveListener.clear();
+ BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
+ .getSPSManager().getInternalSPSService();
+ sps.setBlockMovementListener(blkMoveListener);
+
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -328,6 +349,9 @@ public class TestStoragePolicySatisfierWithStripedFile {
conf.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
+ conf.set(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ "5000");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
@@ -559,22 +583,16 @@ public class TestStoragePolicySatisfierWithStripedFile {
private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException {
- BlockManager blockManager = cluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier<Long> sps =
- (StoragePolicySatisfier<Long>) blockManager
- .getSPSManager().getInternalSPSService();
- Assert.assertNotNull("Failed to get SPS object reference!", sps);
-
+ Assert.assertNotNull("Didn't set external block move listener",
+ blkMoveListener);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
+ int actualCount = blkMoveListener.getActualBlockMovements().size();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMoveFinishedBlks,
- ((BlockStorageMovementAttemptedItems<Long>) sps
- .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
- return ((BlockStorageMovementAttemptedItems<Long>) sps
- .getAttemptedItemsMonitor())
- .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
+ actualCount);
+ return actualCount >= expectedMoveFinishedBlks;
}
}, 100, timeout);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7c8d488/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 28e172a..be243cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -54,11 +54,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
@@ -92,6 +90,8 @@ public class TestExternalStoragePolicySatisfier
private File baseDir;
private StoragePolicySatisfier<String> externalSps;
private ExternalSPSContext externalCtxt;
+ private ExternalBlockMovementListener blkMoveListener =
+ new ExternalBlockMovementListener();
@After
public void destroy() throws Exception {
@@ -144,15 +144,12 @@ public class TestExternalStoragePolicySatisfier
nnc = getNameNodeConnector(getConf());
externalSps = new StoragePolicySatisfier<String>(getConf());
- externalCtxt = new ExternalSPSContext(externalSps,
- getNameNodeConnector(conf));
+ externalCtxt = new ExternalSPSContext(externalSps, nnc);
- ExternalBlockMovementListener blkMoveListener =
- new ExternalBlockMovementListener();
+ blkMoveListener.clear();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, nnc,
externalSps);
- externalHandler.init();
externalSps.init(externalCtxt,
new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMoveListener);
@@ -169,33 +166,17 @@ public class TestExternalStoragePolicySatisfier
getCluster().waitActive();
externalSps = new StoragePolicySatisfier<>(getConf());
- externalCtxt = new ExternalSPSContext(externalSps,
- getNameNodeConnector(getConf()));
- ExternalBlockMovementListener blkMoveListener =
- new ExternalBlockMovementListener();
+ externalCtxt = new ExternalSPSContext(externalSps, nnc);
+ blkMoveListener.clear();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
externalSps);
- externalHandler.init();
externalSps.init(externalCtxt,
new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMoveListener);
externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
- private class ExternalBlockMovementListener implements BlockMovementListener {
-
- private List<Block> actualBlockMovements = new ArrayList<>();
-
- @Override
- public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
- for (Block block : moveAttemptFinishedBlks) {
- actualBlockMovements.add(block);
- }
- LOG.info("Movement attempted blocks", actualBlockMovements);
- }
- }
-
private NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -237,16 +218,15 @@ public class TestExternalStoragePolicySatisfier
public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
+ Assert.assertNotNull("Didn't set external block move listener",
+ blkMoveListener);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
+ int actualCount = blkMoveListener.getActualBlockMovements().size();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
- expectedMovementFinishedBlocksCount,
- ((BlockStorageMovementAttemptedItems<String>) (externalSps
- .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
- return ((BlockStorageMovementAttemptedItems<String>) (externalSps
- .getAttemptedItemsMonitor()))
- .getMovementFinishedBlocksCount()
+ expectedMovementFinishedBlocksCount, actualCount);
+ return actualCount
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
@@ -352,6 +332,8 @@ public class TestExternalStoragePolicySatisfier
files.add(FILE);
DistributedFileSystem fs = getFS();
+ // stops sps to make the SPS Q with many outstanding requests.
+ externalSps.stopGracefully();
// Creates 4 more files. Send all of them for satisfying the storage
// policy together.
for (int i = 0; i < 3; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org