You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:15:58 UTC
[31/50] [abbrv] hadoop git commit: HDFS-11243. [SPS]: Add a protocol
command from NN to DN for dropping the SPS work and queues. Contributed by
Uma Maheswara Rao G
HDFS-11243. [SPS]: Add a protocol command from NN to DN for dropping the SPS work and queues. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69d8cccf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69d8cccf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69d8cccf
Branch: refs/heads/HDFS-10285
Commit: 69d8cccf0ed9f2742e9cc96c0ca40cf8b8741c5c
Parents: a27ab4a
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Tue Jan 31 23:44:01 2017 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 11:55:16 2017 +0530
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 12 ++++
.../server/blockmanagement/BlockManager.java | 13 ++---
.../blockmanagement/DatanodeDescriptor.java | 18 ++++++
.../server/blockmanagement/DatanodeManager.java | 19 +++++++
.../hdfs/server/datanode/BPOfferService.java | 4 ++
.../datanode/BlockStorageMovementTracker.java | 12 ++++
.../datanode/StoragePolicySatisfyWorker.java | 22 +++++++-
.../server/namenode/StoragePolicySatisfier.java | 25 +++++++--
.../hdfs/server/protocol/DatanodeProtocol.java | 2 +
.../server/protocol/DropSPSWorkCommand.java | 36 ++++++++++++
.../src/main/proto/DatanodeProtocol.proto | 9 +++
.../TestStoragePolicySatisfyWorker.java | 59 ++++++++++++++++++++
12 files changed, 216 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 156c9c2..3b38077 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdComma
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -139,6 +141,10 @@ public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
+ private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO =
+ DropSPSWorkCommandProto.newBuilder().build();
+ private static final DropSPSWorkCommand DROP_SPS_WORK_CMD =
+ new DropSPSWorkCommand();
private PBHelper() {
/** Hidden constructor */
@@ -474,6 +480,8 @@ public class PBHelper {
return PBHelper.convert(proto.getBlkECReconstructionCmd());
case BlockStorageMovementCommand:
return PBHelper.convert(proto.getBlkStorageMovementCmd());
+ case DropSPSWorkCommand:
+ return DROP_SPS_WORK_CMD;
default:
return null;
}
@@ -613,6 +621,10 @@ public class PBHelper {
.setBlkStorageMovementCmd(
convert((BlockStorageMovementCommand) datanodeCommand));
break;
+ case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
+ builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand)
+ .setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO);
+ break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f053265..1a809a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -701,13 +701,13 @@ public class BlockManager implements BlockStatsMXBean {
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
if (sps != null && !haEnabled) {
- sps.start();
+ sps.start(false);
}
}
public void close() {
if (sps != null) {
- sps.stop();
+ sps.stop(false);
}
bmSafeMode.close();
try {
@@ -4905,7 +4905,7 @@ public class BlockManager implements BlockStatsMXBean {
return;
}
- sps.start();
+ sps.start(true);
}
/**
@@ -4919,12 +4919,7 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is already stopped.");
return;
}
- sps.stop();
- // TODO: add command to DNs for stop in-progress processing SPS commands?
- // to avoid confusions in cluster, I think sending commands from centralized
- // place would be better to drop pending queues at DN. Anyway in progress
- // work will be finished in a while, but this command can void starting
- // fresh movements at DN.
+ sps.stop(true);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 9d3b544..95cb3a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -213,6 +213,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/
private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
new LinkedList<>();
+ private volatile boolean dropSPSWork = false;
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
@@ -1054,4 +1055,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
return storageMovementBlocks.poll();
}
}
+
+ /**
+ * Set whether to drop SPS related queues at DN side.
+ *
+ * @param dropSPSWork
+ * - true if need to drop SPS queues, otherwise false.
+ */
+ public synchronized void setDropSPSWork(boolean dropSPSWork) {
+ this.dropSPSWork = dropSPSWork;
+ }
+
+ /**
+ * @return true if need to drop SPS queues at DN.
+ */
+ public synchronized boolean shouldDropSPSWork() {
+ return this.dropSPSWork;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 048b68f..51c5aef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1748,6 +1748,13 @@ public class DatanodeManager {
blkStorageMovementInfosBatch.getBlockMovingInfo()));
}
+ if (nodeinfo.shouldDropSPSWork()) {
+ cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+ // Set back to false to indicate that the new value has been sent to the
+ // datanode.
+ nodeinfo.setDropSPSWork(false);
+ }
+
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@@ -1976,5 +1983,17 @@ public class DatanodeManager {
return slowDiskTracker != null ?
slowDiskTracker.getSlowDiskReportAsJsonString() : null;
}
+
+ /**
+ * Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added
+ * in heartbeat response, which will indicate DN to drop SPS queues
+ */
+ public void addDropSPSWorkCommandsToAllDNs() {
+ synchronized (this) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.setDropSPSWork(true);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index c77fe2b..39ff4b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -788,6 +788,10 @@ class BPOfferService {
blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
blkSPSCmd.getBlockMovingTasks());
break;
+ case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
+ LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
+ dn.getStoragePolicySatisfyWorker().dropSPSWork();
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index bd35b09..e623cef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -146,4 +146,16 @@ public class BlockStorageMovementTracker implements Runnable {
moverTaskFutures.notify();
}
}
+
+ /**
+ * Clear the pending movement and movement result queues.
+ */
+ void removeAll() {
+ synchronized (moverTaskFutures) {
+ moverTaskFutures.clear();
+ }
+ synchronized (movementResults) {
+ movementResults.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 10adbfd..a96ac98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -115,7 +115,6 @@ public class StoragePolicySatisfyWorker {
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
-
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
@@ -421,10 +420,31 @@ public class StoragePolicySatisfyWorker {
}
}
}
+
+ /**
+ * Clear the trackID vs movement status tracking map.
+ */
+ void removeAll() {
+ synchronized (trackIdVsMovementStatus) {
+ trackIdVsMovementStatus.clear();
+ }
+ }
+
}
@VisibleForTesting
BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
return handler;
}
+
+ /**
+ * Drop the in-progress SPS work queues.
+ */
+ public void dropSPSWork() {
+ LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
+ + "So, none of the SPS Worker queued block movements will"
+ + " be scheduled.");
+ movementTracker.removeAll();
+ handler.removeAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 1c48910..dc58294 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -99,9 +99,14 @@ public class StoragePolicySatisfier implements Runnable {
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*/
- public synchronized void start() {
+ public synchronized void start(boolean reconfigStart) {
isRunning = true;
- LOG.info("Starting StoragePolicySatisfier.");
+ if (reconfigStart) {
+ LOG.info("Starting StoragePolicySatisfier, as admin requested to "
+ + "activate it.");
+ } else {
+ LOG.info("Starting StoragePolicySatisfier.");
+ }
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -110,10 +115,17 @@ public class StoragePolicySatisfier implements Runnable {
/**
* Stop storage policy satisfier demon thread.
+ *
+ * @param reconfigStop
*/
- public synchronized void stop() {
+ public synchronized void stop(boolean reconfigStop) {
isRunning = false;
- LOG.info("Stopping StoragePolicySatisfier.");
+ if (reconfigStop) {
+ LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+ + "deactivate it.");
+ } else {
+ LOG.info("Stopping StoragePolicySatisfier.");
+ }
if (storagePolicySatisfierThread == null) {
return;
}
@@ -123,7 +135,10 @@ public class StoragePolicySatisfier implements Runnable {
} catch (InterruptedException ie) {
}
this.storageMovementsMonitor.stop();
- this.clearQueues();
+ if (reconfigStop) {
+ this.clearQueues();
+ this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 858f59b..892efb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -80,6 +80,8 @@ public interface DatanodeProtocol {
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
+ final static int DNA_DROP_SPS_WORK_COMMAND = 13; // block storage movement
+ // command
/**
* Register Datanode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
new file mode 100644
index 0000000..806f713
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A DropSPSWorkCommand is an instruction to a datanode to drop the SPSWorker's
+ * pending block storage movement queues.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DropSPSWorkCommand extends DatanodeCommand {
+ public static final DropSPSWorkCommand DNA_DROP_SPS_WORK_COMMAND =
+ new DropSPSWorkCommand();
+
+ public DropSPSWorkCommand() {
+ super(DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 77b0f86..899dc7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -61,6 +61,7 @@ message DatanodeCommandProto {
BlockIdCommand = 8;
BlockECReconstructionCommand = 9;
BlockStorageMovementCommand = 10;
+ DropSPSWorkCommand = 11;
}
required Type cmdType = 1; // Type of the command
@@ -76,6 +77,7 @@ message DatanodeCommandProto {
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
+ optional DropSPSWorkCommandProto dropSPSWorkCmd = 11;
}
/**
@@ -166,6 +168,13 @@ message BlockStorageMovementCommandProto {
}
/**
+ * Instruct datanode to drop SPS work queues
+ */
+message DropSPSWorkCommandProto {
+ // void
+}
+
+/**
* Block storage movement information
*/
message BlockStorageMovementProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d8cccf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 8e02d41..86b8b50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -186,6 +188,63 @@ public class TestStoragePolicySatisfyWorker {
waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
}
+ /**
+ * Tests that drop SPS work method clears all the queues.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testDropSPSWork() throws Exception {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
+
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testDropSPSWork";
+ DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
+ DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
+
+ // move to ARCHIVE
+ dfs.setStoragePolicy(new Path(file), "COLD");
+
+ DataNode src = cluster.getDataNodes().get(2);
+ DatanodeInfo targetDnInfo =
+ DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
+
+ StoragePolicySatisfyWorker worker =
+ new StoragePolicySatisfyWorker(conf, src);
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ List<LocatedBlock> locatedBlocks =
+ dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
+ for (LocatedBlock locatedBlock : locatedBlocks) {
+ BlockMovingInfo blockMovingInfo =
+ prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
+ locatedBlock.getLocations()[0], targetDnInfo,
+ locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
+ blockMovingInfos.add(blockMovingInfo);
+ }
+ INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+ worker.processBlockMovingTasks(inode.getId(),
+ cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+ // Wait till results queue build up
+ waitForBlockMovementResult(worker, inode.getId(), 30000);
+ worker.dropSPSWork();
+ assertTrue(worker.getBlocksMovementsCompletionHandler()
+ .getBlksMovementResults().size() == 0);
+ }
+
+ private void waitForBlockMovementResult(
+ final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
+ throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ List<BlocksStorageMovementResult> completedBlocks = worker
+ .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+ return completedBlocks.size() > 0;
+ }
+ }, 100, timeout);
+ }
+
private void waitForBlockMovementCompletion(
final StoragePolicySatisfyWorker worker, final long inodeId,
int expectedFailedItemsCount, int timeout) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org