You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:08 UTC
[40/50] [abbrv] hadoop git commit: HDFS-10794. [SPS]: Provide storage
policy satisfy worker at DN for co-ordinating the block storage movement
work. Contributed by Rakesh R
HDFS-10794. [SPS]: Provide storage policy satisfy worker at DN for co-ordinating the block storage movement work. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ec39c60
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ec39c60
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ec39c60
Branch: refs/heads/HDFS-10285
Commit: 9ec39c60e529c3c3cf98675ad7e74fdf82a74c3b
Parents: 64a2d5b
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Sep 14 17:02:11 2016 +0800
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:22:48 2016 +0530
----------------------------------------------------------------------
.../datanode/StoragePolicySatisfyWorker.java | 258 +++++++++++++++++++
.../protocol/BlockStorageMovementCommand.java | 101 ++++++++
.../TestStoragePolicySatisfyWorker.java | 160 ++++++++++++
3 files changed, 519 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ec39c60/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
new file mode 100644
index 0000000..6df4e81
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
+ * These commands would be issued from NameNode as part of Datanode's heart beat
+ * response. BPOfferService delegates the work to this class for handling
+ * BlockStorageMovement commands.
+ */
+@InterfaceAudience.Private
+public class StoragePolicySatisfyWorker {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(StoragePolicySatisfyWorker.class);
+
+ private final DataNode datanode;
+ private final int ioFileBufferSize;
+
+ private final int moverThreads;
+ private final ExecutorService moveExecutor;
+ private final CompletionService<Void> moverExecutorCompletionService;
+ private final List<Future<Void>> moverTaskFutures;
+
+ public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
+ this.datanode = datanode;
+ this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+
+ moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+ moveExecutor = initializeBlockMoverThreadPool(moverThreads);
+ moverExecutorCompletionService = new ExecutorCompletionService<>(
+ moveExecutor);
+ moverTaskFutures = new ArrayList<>();
+ // TODO: Needs to manage the number of concurrent moves per DataNode.
+ }
+
+ private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
+ LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
+
+ ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution for block movement to satisfy storage policy"
+ + " got rejected, Executing in current thread");
+ // will run in the current thread.
+ super.rejectedExecution(runnable, e);
+ }
+ });
+
+ moverThreadPool.allowCoreThreadTimeOut(true);
+ return moverThreadPool;
+ }
+
+ public void processBlockMovingTasks(long trackID,
+ List<BlockMovingInfo> blockMovingInfos) {
+ Future<Void> moveCallable = null;
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ assert blkMovingInfo
+ .getSources().length == blkMovingInfo.getTargets().length;
+
+ for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
+ BlockMovingTask blockMovingTask =
+ new BlockMovingTask(blkMovingInfo.getBlock(),
+ blkMovingInfo.getSources()[i],
+ blkMovingInfo.getTargets()[i],
+ blkMovingInfo.getTargetStorageTypes()[i]);
+ moveCallable = moverExecutorCompletionService
+ .submit(blockMovingTask);
+ moverTaskFutures.add(moveCallable);
+ }
+ }
+
+ // TODO: Presently this function act as a blocking call, this has to be
+ // refined by moving the tracking logic to another tracker thread.
+ for (int i = 0; i < moverTaskFutures.size(); i++) {
+ try {
+ moveCallable = moverExecutorCompletionService.take();
+ moveCallable.get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO: Failure retries and report back the error to NameNode.
+ LOG.error("Exception while moving block replica to target storage type",
+ e);
+ }
+ }
+ }
+
+ /**
+ * This class encapsulates the process of moving the block replica to the
+ * given target.
+ */
+ private class BlockMovingTask implements Callable<Void> {
+ private final ExtendedBlock block;
+ private final DatanodeInfo source;
+ private final DatanodeInfo target;
+ private final StorageType targetStorageType;
+
+ BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
+ DatanodeInfo target, StorageType targetStorageType) {
+ this.block = block;
+ this.source = source;
+ this.target = target;
+ this.targetStorageType = targetStorageType;
+ }
+
+ @Override
+ public Void call() {
+ moveBlock();
+ return null;
+ }
+
+ private void moveBlock() {
+ LOG.info("Start moving block {}", block);
+
+ LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
+ + "storageType:{}", block, source, target, targetStorageType);
+ Socket sock = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ DNConf dnConf = datanode.getDnConf();
+ String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
+ sock = datanode.newSocket();
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
+ dnConf.getSocketTimeout());
+ sock.setSoTimeout(2 * dnConf.getSocketTimeout());
+ LOG.debug("Connecting to datanode {}", dnAddr);
+
+ OutputStream unbufOut = sock.getOutputStream();
+ InputStream unbufIn = sock.getInputStream();
+
+ Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
+ block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+ DataEncryptionKeyFactory keyFactory = datanode
+ .getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
+ unbufOut, unbufIn, keyFactory, accessToken, target);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(
+ new BufferedOutputStream(unbufOut, ioFileBufferSize));
+ in = new DataInputStream(
+ new BufferedInputStream(unbufIn, ioFileBufferSize));
+ sendRequest(out, block, accessToken, source, targetStorageType);
+ receiveResponse(in);
+
+ LOG.debug(
+ "Successfully moved block:{} from src:{} to destin:{} for"
+ + " satisfying storageType:{}",
+ block, source, target, targetStorageType);
+ } catch (IOException e) {
+ // TODO: handle failure retries
+ LOG.warn(
+ "Failed to move block:{} from src:{} to destin:{} to satisfy "
+ + "storageType:{}",
+ block, source, target, targetStorageType, e);
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(sock);
+ }
+ }
+
+ /** Send a reportedBlock replace request to the output stream. */
+ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
+ StorageType destinStorageType) throws IOException {
+ new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
+ srcDn.getDatanodeUuid(), srcDn);
+ }
+
+ /** Receive a reportedBlock copy response from the input stream. */
+ private void receiveResponse(DataInputStream in) throws IOException {
+ BlockOpResponseProto response = BlockOpResponseProto
+ .parseFrom(vintPrefixed(in));
+ while (response.getStatus() == Status.IN_PROGRESS) {
+ // read intermediate responses
+ response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+ }
+ String logInfo = "reportedBlock move is failed";
+ DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ec39c60/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
new file mode 100644
index 0000000..42ba265
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * A BlockStorageMovementCommand is an instruction to a DataNode to move the
+ * given set of blocks to specified target DataNodes to fulfill the block
+ * storage policy.
+ *
+ * Upon receiving this command, this DataNode coordinates all the block movement
+ * by passing the details to
+ * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
+ * service. After the block movement this DataNode sends response back to the
+ * NameNode about the movement status.
+ */
+public class BlockStorageMovementCommand extends DatanodeCommand {
+
+ // TODO: constructor needs to be refined based on the block movement data
+ // structure.
+ BlockStorageMovementCommand(int action) {
+ super(action);
+ }
+
+ /**
+ * Stores block to storage info that can be used for block movement.
+ */
+ public static class BlockMovingInfo {
+ private ExtendedBlock blk;
+ private DatanodeInfo[] sourceNodes;
+ private StorageType[] sourceStorageTypes;
+ private DatanodeInfo[] targetNodes;
+ private StorageType[] targetStorageTypes;
+
+ public BlockMovingInfo(ExtendedBlock block,
+ DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
+ StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
+ this.blk = block;
+ this.sourceNodes = sourceDnInfos;
+ this.targetNodes = targetDnInfos;
+ this.sourceStorageTypes = srcStorageTypes;
+ this.targetStorageTypes = targetStorageTypes;
+ }
+
+ public void addBlock(ExtendedBlock block) {
+ this.blk = block;
+ }
+
+ public ExtendedBlock getBlock() {
+ return this.blk;
+ }
+
+ public DatanodeInfo[] getSources() {
+ return sourceNodes;
+ }
+
+ public DatanodeInfo[] getTargets() {
+ return targetNodes;
+ }
+
+ public StorageType[] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+
+ public StorageType[] getSourceStorageTypes() {
+ return sourceStorageTypes;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("BlockMovingInfo(\n ")
+ .append("Moving block: ").append(blk).append(" From: ")
+ .append(Arrays.asList(sourceNodes)).append(" To: [")
+ .append(Arrays.asList(targetNodes)).append(")\n")
+ .append(" sourceStorageTypes: ")
+ .append(Arrays.toString(sourceStorageTypes))
+ .append(" targetStorageTypes: ")
+ .append(Arrays.toString(targetStorageTypes)).toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ec39c60/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
new file mode 100644
index 0000000..c722306
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests the behavior of moving block replica to the given storage
+ * type to fulfill the storage policy requirement.
+ */
+public class TestStoragePolicySatisfyWorker {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestStoragePolicySatisfyWorker.class);
+
+ private static final int DEFAULT_BLOCK_SIZE = 100;
+
+ private static void initConf(Configuration conf) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+ 1L);
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+ }
+
+ /**
+ * Tests to verify that the block replica is moving to ARCHIVE storage type to
+ * fulfill the storage policy requirement.
+ */
+ @Test(timeout = 120000)
+ public void testMoveSingleBlockToAnotherDatanode() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(4)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE}})
+ .build();
+ try {
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testMoveSingleBlockToAnotherDatanode";
+ // write to DISK
+ final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
+ out.writeChars("testMoveSingleBlockToAnotherDatanode");
+ out.close();
+
+ // verify before movement
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
+ }
+ // move to ARCHIVE
+ dfs.setStoragePolicy(new Path(file), "COLD");
+
+ lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ DataNode src = cluster.getDataNodes().get(3);
+ DatanodeInfo targetDnInfo = DFSTestUtil
+ .getLocalDatanodeInfo(src.getXferPort());
+
+ // TODO: Need to revisit this when NN is implemented to be able to send
+ // block moving commands.
+ StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
+ src);
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+ lb.getBlock(), lb.getLocations()[0], targetDnInfo,
+ lb.getStorageTypes()[0], StorageType.ARCHIVE);
+ blockMovingInfos.add(blockMovingInfo);
+ INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+ worker.processBlockMovingTasks(inode.getId(),
+ blockMovingInfos);
+ cluster.triggerHeartbeats();
+
+ // Wait till NameNode notified about the block location details
+ waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private void waitForLocatedBlockWithArchiveStorageType(
+ final DistributedFileSystem dfs, final String file,
+ int expectedArchiveCount, int timeout) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LocatedBlock lb = null;
+ try {
+ lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ } catch (IOException e) {
+ LOG.error("Exception while getting located blocks", e);
+ return false;
+ }
+
+ int archiveCount = 0;
+ for (StorageType storageType : lb.getStorageTypes()) {
+ if (StorageType.ARCHIVE == storageType) {
+ archiveCount++;
+ }
+ }
+ LOG.info("Archive replica count, expected={} and actual={}",
+ expectedArchiveCount, archiveCount);
+ return expectedArchiveCount == archiveCount;
+ }
+ }, 100, timeout);
+ }
+
+ BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
+ DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
+ StorageType targetStorageType) {
+ return new BlockMovingInfo(block, new DatanodeInfo[] {src},
+ new DatanodeInfo[] {destin}, new StorageType[] {storageType},
+ new StorageType[] {targetStorageType});
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org