You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:23 UTC
[30/50] [abbrv] hadoop git commit: HDFS-13033: [SPS]: Implement a
mechanism to do file block movements for external SPS. Contributed by Rakesh
R.
HDFS-13033: [SPS]: Implement a mechanism to do file block movements for external SPS. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1db3fb75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1db3fb75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1db3fb75
Branch: refs/heads/HDFS-10285
Commit: 1db3fb757bcb3eadc45b56d366a68512e790b50b
Parents: 3506306
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Jan 23 16:19:46 2018 -0800
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Thu Aug 9 20:47:22 2018 -0700
----------------------------------------------------------------------
.../hdfs/server/balancer/NameNodeConnector.java | 8 +
.../hdfs/server/common/sps/BlockDispatcher.java | 186 +++++++++++++
.../sps/BlockMovementAttemptFinished.java | 80 ++++++
.../server/common/sps/BlockMovementStatus.java | 53 ++++
.../common/sps/BlockStorageMovementTracker.java | 184 +++++++++++++
.../sps/BlocksMovementsStatusHandler.java | 95 +++++++
.../hdfs/server/common/sps/package-info.java | 27 ++
.../datanode/BlockStorageMovementTracker.java | 186 -------------
.../datanode/StoragePolicySatisfyWorker.java | 271 ++-----------------
.../hdfs/server/namenode/FSNamesystem.java | 4 +-
.../namenode/sps/BlockMoveTaskHandler.java | 3 +-
.../sps/BlockStorageMovementAttemptedItems.java | 12 +-
.../IntraSPSNameNodeBlockMoveTaskHandler.java | 3 +-
.../hdfs/server/namenode/sps/SPSService.java | 14 +-
.../namenode/sps/StoragePolicySatisfier.java | 30 +-
.../sps/ExternalSPSBlockMoveTaskHandler.java | 233 ++++++++++++++++
.../TestBlockStorageMovementAttemptedItems.java | 2 +-
.../sps/TestStoragePolicySatisfier.java | 6 +-
.../sps/TestExternalStoragePolicySatisfier.java | 69 ++++-
19 files changed, 997 insertions(+), 469 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index b0dd779..6bfbbb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -269,6 +269,14 @@ public class NameNodeConnector implements Closeable {
}
}
+ /**
+ * Returns fallbackToSimpleAuth. This will be true or false during calls to
+ * indicate if a secure client falls back to simple auth.
+ */
+ public AtomicBoolean getFallbackToSimpleAuth() {
+ return fallbackToSimpleAuth;
+ }
+
@Override
public void close() {
keyManager.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
new file mode 100644
index 0000000..f87fcae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatching block replica moves between datanodes to satisfy the storage
+ * policy.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockDispatcher {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BlockDispatcher.class);
+
+ private final boolean connectToDnViaHostname;
+ private final int socketTimeout;
+ private final int ioFileBufferSize;
+
+ /**
+ * Construct block dispatcher details.
+ *
+ * @param sockTimeout
+ * soTimeout
+ * @param ioFileBuffSize
+ * file io buffer size
+ * @param connectToDatanodeViaHostname
+ * true represents connect via hostname, false otw
+ */
+ public BlockDispatcher(int sockTimeout, int ioFileBuffSize,
+ boolean connectToDatanodeViaHostname) {
+ this.socketTimeout = sockTimeout;
+ this.ioFileBufferSize = ioFileBuffSize;
+ this.connectToDnViaHostname = connectToDatanodeViaHostname;
+ }
+
+ /**
+ * Moves the given block replica to the given target node and wait for the
+ * response.
+ *
+ * @param blkMovingInfo
+ * block to storage info
+ * @param saslClient
+ * SASL for DataTransferProtocol on behalf of a client
+ * @param eb
+ * extended block info
+ * @param sock
+ * target node's socket
+ * @param km
+ * for creation of an encryption key
+ * @param accessToken
+ * connection block access token
+ * @return status of the block movement
+ */
+ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
+ SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
+ DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
+ LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), blkMovingInfo.getSourceStorageType(),
+ blkMovingInfo.getTargetStorageType());
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ NetUtils.connect(sock,
+ NetUtils.createSocketAddr(
+ blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
+ socketTimeout);
+ // Set read timeout so that it doesn't hang forever against
+ // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+ // twice within the client read timeout period (every 30 seconds by
+ // default). Here, we make it give up after "socketTimeout * 5" period
+ // of no response.
+ sock.setSoTimeout(socketTimeout * 5);
+ sock.setKeepAlive(true);
+ OutputStream unbufOut = sock.getOutputStream();
+ InputStream unbufIn = sock.getInputStream();
+ LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
+
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, km, accessToken, blkMovingInfo.getTarget());
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(
+ new BufferedOutputStream(unbufOut, ioFileBufferSize));
+ in = new DataInputStream(
+ new BufferedInputStream(unbufIn, ioFileBufferSize));
+ sendRequest(out, eb, accessToken, blkMovingInfo.getSource(),
+ blkMovingInfo.getTargetStorageType());
+ receiveResponse(in);
+
+ LOG.info(
+ "Successfully moved block:{} from src:{} to destin:{} for"
+ + " satisfying storageType:{}",
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType());
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+ } catch (BlockPinningException e) {
+ // Pinned block won't be able to move to a different node. So, its not
+ // required to do retries, just marked as SUCCESS.
+ LOG.debug("Pinned block can't be moved, so skipping block:{}",
+ blkMovingInfo.getBlock(), e);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+ } catch (IOException e) {
+ // TODO: handle failure retries
+ LOG.warn(
+ "Failed to move block:{} from src:{} to destin:{} to satisfy "
+ + "storageType:{}",
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(sock);
+ }
+ }
+
+ /** Send a reportedBlock replace request to the output stream. */
+ private static void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ Token<BlockTokenIdentifier> accessToken, DatanodeInfo source,
+ StorageType targetStorageType) throws IOException {
+ new Sender(out).replaceBlock(eb, targetStorageType, accessToken,
+ source.getDatanodeUuid(), source, null);
+ }
+
+ /** Receive a reportedBlock copy response from the input stream. */
+ private static void receiveResponse(DataInputStream in) throws IOException {
+ BlockOpResponseProto response = BlockOpResponseProto
+ .parseFrom(vintPrefixed(in));
+ while (response.getStatus() == Status.IN_PROGRESS) {
+ // read intermediate responses
+ response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+ }
+ String logInfo = "reportedBlock move is failed";
+ DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
new file mode 100644
index 0000000..419d806
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * This class represents status from a block movement task. This will have the
+ * information of the task which was successful or failed due to errors.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMovementAttemptFinished {
+ private final Block block;
+ private final DatanodeInfo src;
+ private final DatanodeInfo target;
+ private final BlockMovementStatus status;
+
+ /**
+ * Construct movement attempt finished info.
+ *
+ * @param block
+ * block
+ * @param src
+ * src datanode
+ * @param target
+ * target datanode
+ * @param status
+ * movement status
+ */
+ public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
+ DatanodeInfo target, BlockMovementStatus status) {
+ this.block = block;
+ this.src = src;
+ this.target = target;
+ this.status = status;
+ }
+
+ /**
+ * @return details of the block, which attempted to move from src to target
+ * node.
+ */
+ public Block getBlock() {
+ return block;
+ }
+
+ /**
+ * @return block movement status code.
+ */
+ public BlockMovementStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Block movement attempt finished(\n ")
+ .append(" block : ").append(block).append(" src node: ").append(src)
+ .append(" target node: ").append(target).append(" movement status: ")
+ .append(status).append(")").toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
new file mode 100644
index 0000000..f70d84f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Block movement status code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockMovementStatus {
+ /** Success. */
+ DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
+ /**
+ * Failure due to generation time stamp mismatches or network errors
+ * or no available space.
+ */
+ DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
+
+ // TODO: need to support different type of failures. Failure due to network
+ // errors, block pinned, no space available etc.
+
+ private final int code;
+
+ BlockMovementStatus(int code) {
+ this.code = code;
+ }
+
+ /**
+ * @return the status code.
+ */
+ int getStatusCode() {
+ return code;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
new file mode 100644
index 0000000..b20d6cf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to track the completion of block movement future tasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockStorageMovementTracker implements Runnable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BlockStorageMovementTracker.class);
+ private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
+ private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
+
+ // Keeps the information - block vs its list of future move tasks
+ private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
+ private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
+
+ private volatile boolean running = true;
+
+ /**
+ * BlockStorageMovementTracker constructor.
+ *
+ * @param moverCompletionService
+ * completion service.
+ * @param handler
+ * blocks movements status handler
+ */
+ public BlockStorageMovementTracker(
+ CompletionService<BlockMovementAttemptFinished> moverCompletionService,
+ BlocksMovementsStatusHandler handler) {
+ this.moverCompletionService = moverCompletionService;
+ this.moverTaskFutures = new HashMap<>();
+ this.blksMovementsStatusHandler = handler;
+ this.movementResults = new HashMap<>();
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ if (moverTaskFutures.size() <= 0) {
+ try {
+ synchronized (moverTaskFutures) {
+ // Waiting for mover tasks.
+ moverTaskFutures.wait(2000);
+ }
+ } catch (InterruptedException ignore) {
+ // Sets interrupt flag of this thread.
+ Thread.currentThread().interrupt();
+ }
+ }
+ try {
+ Future<BlockMovementAttemptFinished> future =
+ moverCompletionService.take();
+ if (future != null) {
+ BlockMovementAttemptFinished result = future.get();
+ LOG.debug("Completed block movement. {}", result);
+ Block block = result.getBlock();
+ List<Future<BlockMovementAttemptFinished>> blocksMoving =
+ moverTaskFutures.get(block);
+ if (blocksMoving == null) {
+ LOG.warn("Future task doesn't exist for block : {} ", block);
+ continue;
+ }
+ blocksMoving.remove(future);
+
+ List<BlockMovementAttemptFinished> resultPerTrackIdList =
+ addMovementResultToBlockIdList(result);
+
+ // Completed all the scheduled blocks movement under this 'trackId'.
+ if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
+ synchronized (moverTaskFutures) {
+ moverTaskFutures.remove(block);
+ }
+ if (running) {
+ // handle completed or inprogress blocks movements per trackId.
+ blksMovementsStatusHandler.handle(resultPerTrackIdList);
+ }
+ movementResults.remove(block);
+ }
+ }
+ } catch (InterruptedException e) {
+ if (running) {
+ LOG.error("Exception while moving block replica to target storage"
+ + " type", e);
+ }
+ } catch (ExecutionException e) {
+ // TODO: Do we need failure retries and implement the same if required.
+ LOG.error("Exception while moving block replica to target storage type",
+ e);
+ }
+ }
+ }
+
+ private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
+ BlockMovementAttemptFinished result) {
+ Block block = result.getBlock();
+ List<BlockMovementAttemptFinished> perBlockIdList;
+ synchronized (movementResults) {
+ perBlockIdList = movementResults.get(block);
+ if (perBlockIdList == null) {
+ perBlockIdList = new ArrayList<>();
+ movementResults.put(block, perBlockIdList);
+ }
+ perBlockIdList.add(result);
+ }
+ return perBlockIdList;
+ }
+
+ /**
+ * Add future task to the tracking list to check the completion status of the
+ * block movement.
+ *
+ * @param blockID
+ * block identifier
+ * @param futureTask
+ * future task used for moving the respective block
+ */
+ public void addBlock(Block block,
+ Future<BlockMovementAttemptFinished> futureTask) {
+ synchronized (moverTaskFutures) {
+ List<Future<BlockMovementAttemptFinished>> futures =
+ moverTaskFutures.get(block);
+ // null for the first task
+ if (futures == null) {
+ futures = new ArrayList<>();
+ moverTaskFutures.put(block, futures);
+ }
+ futures.add(futureTask);
+ // Notify waiting tracker thread about the newly added tasks.
+ moverTaskFutures.notify();
+ }
+ }
+
+ /**
+ * Clear the pending movement and movement result queues.
+ */
+ public void removeAll() {
+ synchronized (moverTaskFutures) {
+ moverTaskFutures.clear();
+ }
+ synchronized (movementResults) {
+ movementResults.clear();
+ }
+ }
+
+ /**
+ * Sets running flag to false and clear the pending movement result queues.
+ */
+ public void stopTracking() {
+ running = false;
+ removeAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
new file mode 100644
index 0000000..f9f3954
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Blocks movements status handler, which is used to collect details of the
+ * completed block movements and later these attempted finished(with success or
+ * failure) blocks can be accessed to notify respective listeners, if any.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlocksMovementsStatusHandler {
+ private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
+
+ /**
+ * Collect all the storage movement attempt finished blocks. Later this will
+ * be send to namenode via heart beat.
+ *
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks
+ */
+ public void handle(
+ List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+ List<Block> blocks = new ArrayList<>();
+
+ for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+ blocks.add(item.getBlock());
+ }
+ // Adding to the tracking report list. Later this can be accessed to know
+ // the attempted block movements.
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.addAll(blocks);
+ }
+ }
+
+ /**
+ * @return unmodifiable list of storage movement attempt finished blocks.
+ */
+ public List<Block> getMoveAttemptFinishedBlocks() {
+ List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+ // 1. Adding all the completed block ids.
+ synchronized (blockIdVsMovementStatus) {
+ if (blockIdVsMovementStatus.size() > 0) {
+ moveAttemptFinishedBlks = Collections
+ .unmodifiableList(blockIdVsMovementStatus);
+ }
+ }
+ return moveAttemptFinishedBlks;
+ }
+
+ /**
+ * Remove the storage movement attempt finished blocks from the tracking list.
+ *
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks
+ */
+ public void remove(List<Block> moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks != null) {
+ blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
+ }
+ }
+
+ /**
+ * Clear the blockID vs movement status tracking map.
+ */
+ public void removeAll() {
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
new file mode 100644
index 0000000..fcffbe9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides commonly used classes for the block movement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
deleted file mode 100644
index b3b9fd9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is used to track the completion of block movement future tasks.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class BlockStorageMovementTracker implements Runnable {
- private static final Logger LOG = LoggerFactory
- .getLogger(BlockStorageMovementTracker.class);
- private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
- private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
-
- // Keeps the information - block vs its list of future move tasks
- private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
- private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
-
- private volatile boolean running = true;
-
- /**
- * BlockStorageMovementTracker constructor.
- *
- * @param moverCompletionService
- * completion service.
- * @param handler
- * blocks movements status handler
- */
- public BlockStorageMovementTracker(
- CompletionService<BlockMovementAttemptFinished> moverCompletionService,
- BlocksMovementsStatusHandler handler) {
- this.moverCompletionService = moverCompletionService;
- this.moverTaskFutures = new HashMap<>();
- this.blksMovementsStatusHandler = handler;
- this.movementResults = new HashMap<>();
- }
-
- @Override
- public void run() {
- while (running) {
- if (moverTaskFutures.size() <= 0) {
- try {
- synchronized (moverTaskFutures) {
- // Waiting for mover tasks.
- moverTaskFutures.wait(2000);
- }
- } catch (InterruptedException ignore) {
- // Sets interrupt flag of this thread.
- Thread.currentThread().interrupt();
- }
- }
- try {
- Future<BlockMovementAttemptFinished> future =
- moverCompletionService.take();
- if (future != null) {
- BlockMovementAttemptFinished result = future.get();
- LOG.debug("Completed block movement. {}", result);
- Block block = result.getBlock();
- List<Future<BlockMovementAttemptFinished>> blocksMoving =
- moverTaskFutures.get(block);
- if (blocksMoving == null) {
- LOG.warn("Future task doesn't exist for block : {} ", block);
- continue;
- }
- blocksMoving.remove(future);
-
- List<BlockMovementAttemptFinished> resultPerTrackIdList =
- addMovementResultToBlockIdList(result);
-
- // Completed all the scheduled blocks movement under this 'trackId'.
- if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
- synchronized (moverTaskFutures) {
- moverTaskFutures.remove(block);
- }
- if (running) {
- // handle completed or inprogress blocks movements per trackId.
- blksMovementsStatusHandler.handle(resultPerTrackIdList);
- }
- movementResults.remove(block);
- }
- }
- } catch (InterruptedException e) {
- if (running) {
- LOG.error("Exception while moving block replica to target storage"
- + " type", e);
- }
- } catch (ExecutionException e) {
- // TODO: Do we need failure retries and implement the same if required.
- LOG.error("Exception while moving block replica to target storage type",
- e);
- }
- }
- }
-
- private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
- BlockMovementAttemptFinished result) {
- Block block = result.getBlock();
- List<BlockMovementAttemptFinished> perBlockIdList;
- synchronized (movementResults) {
- perBlockIdList = movementResults.get(block);
- if (perBlockIdList == null) {
- perBlockIdList = new ArrayList<>();
- movementResults.put(block, perBlockIdList);
- }
- perBlockIdList.add(result);
- }
- return perBlockIdList;
- }
-
- /**
- * Add future task to the tracking list to check the completion status of the
- * block movement.
- *
- * @param blockID
- * block identifier
- * @param futureTask
- * future task used for moving the respective block
- */
- void addBlock(Block block,
- Future<BlockMovementAttemptFinished> futureTask) {
- synchronized (moverTaskFutures) {
- List<Future<BlockMovementAttemptFinished>> futures =
- moverTaskFutures.get(block);
- // null for the first task
- if (futures == null) {
- futures = new ArrayList<>();
- moverTaskFutures.put(block, futures);
- }
- futures.add(futureTask);
- // Notify waiting tracker thread about the newly added tasks.
- moverTaskFutures.notify();
- }
- }
-
- /**
- * Clear the pending movement and movement result queues.
- */
- void removeAll() {
- synchronized (moverTaskFutures) {
- moverTaskFutures.clear();
- }
- synchronized (movementResults) {
- movementResults.clear();
- }
- }
-
- /**
- * Sets running flag to false and clear the pending movement result queues.
- */
- public void stopTracking() {
- running = false;
- removeAll();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 9a9c7e0..42f2e93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -17,21 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@@ -47,20 +35,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
@@ -81,7 +64,6 @@ public class StoragePolicySatisfyWorker {
.getLogger(StoragePolicySatisfyWorker.class);
private final DataNode datanode;
- private final int ioFileBufferSize;
private final int moverThreads;
private final ExecutorService moveExecutor;
@@ -89,10 +71,10 @@ public class StoragePolicySatisfyWorker {
private final BlocksMovementsStatusHandler handler;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
+ private final BlockDispatcher blkDispatcher;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
- this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
@@ -103,7 +85,10 @@ public class StoragePolicySatisfyWorker {
handler);
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
-
+ DNConf dnConf = datanode.getDnConf();
+ int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+ blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
+ ioFileBufferSize, dnConf.getConnectToDnViaHostname());
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
@@ -183,8 +168,7 @@ public class StoragePolicySatisfyWorker {
assert sourceStorageType != targetStorageType
: "Source and Target storage type shouldn't be same!";
BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
- blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
- blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
+ blkMovingInfo);
Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
.submit(blockMovingTask);
movementTracker.addBlock(blkMovingInfo.getBlock(),
@@ -199,244 +183,45 @@ public class StoragePolicySatisfyWorker {
private class BlockMovingTask implements
Callable<BlockMovementAttemptFinished> {
private final String blockPoolID;
- private final Block block;
- private final DatanodeInfo source;
- private final DatanodeInfo target;
- private final StorageType srcStorageType;
- private final StorageType targetStorageType;
+ private final BlockMovingInfo blkMovingInfo;
- BlockMovingTask(String blockPoolID, Block block,
- DatanodeInfo source, DatanodeInfo target,
- StorageType srcStorageType, StorageType targetStorageType) {
+ BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) {
this.blockPoolID = blockPoolID;
- this.block = block;
- this.source = source;
- this.target = target;
- this.srcStorageType = srcStorageType;
- this.targetStorageType = targetStorageType;
+ this.blkMovingInfo = blkMovInfo;
}
@Override
public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
- return new BlockMovementAttemptFinished(block, source, target, status);
+ return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
+ blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
}
private BlockMovementStatus moveBlock() {
- LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
- + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
- block, source, target, srcStorageType, targetStorageType);
- Socket sock = null;
- DataOutputStream out = null;
- DataInputStream in = null;
+ datanode.incrementXmitsInProgress();
+ ExtendedBlock eb = new ExtendedBlock(blockPoolID,
+ blkMovingInfo.getBlock());
try {
- datanode.incrementXmitsInProgress();
-
- ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
- DNConf dnConf = datanode.getDnConf();
-
- String dnAddr = datanode.getDatanodeId()
- .getXferAddr(dnConf.getConnectToDnViaHostname());
- sock = datanode.newSocket();
- NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
- dnConf.getSocketTimeout());
- sock.setSoTimeout(2 * dnConf.getSocketTimeout());
- LOG.debug("Connecting to datanode {}", dnAddr);
-
- OutputStream unbufOut = sock.getOutputStream();
- InputStream unbufIn = sock.getInputStream();
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
- extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- new StorageType[]{targetStorageType}, new String[0]);
-
+ eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+ new StorageType[]{blkMovingInfo.getTargetStorageType()},
+ new String[0]);
DataEncryptionKeyFactory keyFactory = datanode
- .getDataEncryptionKeyFactoryForBlock(extendedBlock);
- IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
- unbufOut, unbufIn, keyFactory, accessToken, target);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(
- new BufferedOutputStream(unbufOut, ioFileBufferSize));
- in = new DataInputStream(
- new BufferedInputStream(unbufIn, ioFileBufferSize));
- sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
- receiveResponse(in);
+ .getDataEncryptionKeyFactoryForBlock(eb);
- LOG.info(
- "Successfully moved block:{} from src:{} to destin:{} for"
- + " satisfying storageType:{}",
- block, source, target, targetStorageType);
- return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
- } catch (BlockPinningException e) {
- // Pinned block won't be able to move to a different node. So, its not
- // required to do retries, just marked as SUCCESS.
- LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
- e);
- return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+ return blkDispatcher.moveBlock(blkMovingInfo,
+ datanode.getSaslClient(), eb, datanode.newSocket(),
+ keyFactory, accessToken);
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
- block, source, target, targetStorageType, e);
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
datanode.decrementXmitsInProgress();
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
- IOUtils.closeSocket(sock);
- }
- }
-
- /** Send a reportedBlock replace request to the output stream. */
- private void sendRequest(DataOutputStream out, ExtendedBlock eb,
- Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
- StorageType destinStorageType) throws IOException {
- new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
- srcDn.getDatanodeUuid(), srcDn, null);
- }
-
- /** Receive a reportedBlock copy response from the input stream. */
- private void receiveResponse(DataInputStream in) throws IOException {
- BlockOpResponseProto response = BlockOpResponseProto
- .parseFrom(vintPrefixed(in));
- while (response.getStatus() == Status.IN_PROGRESS) {
- // read intermediate responses
- response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
- }
- String logInfo = "reportedBlock move is failed";
- DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
- }
- }
-
- /**
- * Block movement status code.
- */
- public enum BlockMovementStatus {
- /** Success. */
- DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
- /**
- * Failure due to generation time stamp mismatches or network errors
- * or no available space.
- */
- DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
-
- // TODO: need to support different type of failures. Failure due to network
- // errors, block pinned, no space available etc.
-
- private final int code;
-
- BlockMovementStatus(int code) {
- this.code = code;
- }
-
- /**
- * @return the status code.
- */
- int getStatusCode() {
- return code;
- }
- }
-
- /**
- * This class represents status from a block movement task. This will have the
- * information of the task which was successful or failed due to errors.
- */
- static class BlockMovementAttemptFinished {
- private final Block block;
- private final DatanodeInfo src;
- private final DatanodeInfo target;
- private final BlockMovementStatus status;
-
- BlockMovementAttemptFinished(Block block, DatanodeInfo src,
- DatanodeInfo target, BlockMovementStatus status) {
- this.block = block;
- this.src = src;
- this.target = target;
- this.status = status;
- }
-
- Block getBlock() {
- return block;
- }
-
- BlockMovementStatus getStatus() {
- return status;
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append("Block movement attempt finished(\n ")
- .append(" block : ")
- .append(block).append(" src node: ").append(src)
- .append(" target node: ").append(target)
- .append(" movement status: ").append(status).append(")").toString();
- }
- }
-
- /**
- * Blocks movements status handler, which is used to collect details of the
- * completed block movements and it will send these attempted finished(with
- * success or failure) blocks to the namenode via heartbeat.
- */
- public static class BlocksMovementsStatusHandler {
- private final List<Block> blockIdVsMovementStatus =
- new ArrayList<>();
-
- /**
- * Collect all the storage movement attempt finished blocks. Later this will
- * be send to namenode via heart beat.
- *
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks
- */
- void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
- List<Block> blocks = new ArrayList<>();
-
- for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
- blocks.add(item.getBlock());
- }
- // Adding to the tracking report list. Later this will be send to
- // namenode via datanode heartbeat.
- synchronized (blockIdVsMovementStatus) {
- blockIdVsMovementStatus.addAll(blocks);
- }
- }
-
- /**
- * @return unmodifiable list of storage movement attempt finished blocks.
- */
- List<Block> getMoveAttemptFinishedBlocks() {
- List<Block> moveAttemptFinishedBlks = new ArrayList<>();
- // 1. Adding all the completed block ids.
- synchronized (blockIdVsMovementStatus) {
- if (blockIdVsMovementStatus.size() > 0) {
- moveAttemptFinishedBlks = Collections
- .unmodifiableList(blockIdVsMovementStatus);
- }
- }
- return moveAttemptFinishedBlks;
- }
-
- /**
- * Remove the storage movement attempt finished blocks from the tracking
- * list.
- *
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks
- */
- void remove(List<Block> moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks != null) {
- blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
- }
- }
-
- /**
- * Clear the blockID vs movement status tracking map.
- */
- void removeAll() {
- synchronized (blockIdVsMovementStatus) {
- blockIdVsMovementStatus.clear();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed1c823..37322e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1299,7 +1299,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getSPSService()),
new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
blockManager.getSPSService()),
- new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this));
+ new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null);
blockManager.startSPS();
} finally {
startingActiveService = false;
@@ -3996,7 +3996,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ " movement attempt finished block info sent by DN");
}
} else {
- sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
+ sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
index e6f78e1..1b11d01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
@@ -38,7 +38,6 @@ public interface BlockMoveTaskHandler {
* contain the required info to move the block, that source location,
* destination location and storage types.
*/
- void submitMoveTask(BlockMovingInfo blkMovingInfo,
- BlockMovementListener blockMoveCompletionListener) throws IOException;
+ void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index 3f0155d..ea7a093 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -46,8 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
*/
-public class BlockStorageMovementAttemptedItems
- implements BlockMovementListener {
+public class BlockStorageMovementAttemptedItems{
private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@@ -59,6 +58,7 @@ public class BlockStorageMovementAttemptedItems
private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
+ private final BlockMovementListener blkMovementListener;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
@@ -74,7 +74,8 @@ public class BlockStorageMovementAttemptedItems
private final SPSService service;
public BlockStorageMovementAttemptedItems(SPSService service,
- BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+ BlockMovementListener blockMovementListener) {
this.service = service;
long recheckTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@@ -89,6 +90,7 @@ public class BlockStorageMovementAttemptedItems
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new ArrayList<>();
movementFinishedBlocks = new ArrayList<>();
+ this.blkMovementListener = blockMovementListener;
}
/**
@@ -118,6 +120,10 @@ public class BlockStorageMovementAttemptedItems
synchronized (movementFinishedBlocks) {
movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
}
+ // External listener if it is plugged-in
+ if (blkMovementListener != null) {
+ blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
index b27e8c9..d6e92d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
@@ -44,8 +44,7 @@ public class IntraSPSNameNodeBlockMoveTaskHandler
}
@Override
- public void submitMoveTask(BlockMovingInfo blkMovingInfo,
- BlockMovementListener blockMoveCompletionListener) throws IOException {
+ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
namesystem.readLock();
try {
DatanodeDescriptor dn = blockManager.getDatanodeManager()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index d74e391..ecc6ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
/**
* An interface for SPSService, which exposes life cycle and processing APIs.
@@ -41,9 +42,11 @@ public interface SPSService {
* id
* @param handler
* - a helper service for moving the blocks
+ * @param blkMovementListener
+ * - listener to know about block movement attempt completion
*/
void init(Context ctxt, FileIdCollector fileIDCollector,
- BlockMoveTaskHandler handler);
+ BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
/**
* Starts the SPS service. Make sure to initialize the helper services before
@@ -112,4 +115,13 @@ public interface SPSService {
* - directory inode id.
*/
void markScanCompletedForPath(Long inodeId);
+
+ /**
+ * Notify the details of storage movement attempt finished blocks.
+ *
+ * @param moveAttemptFinishedBlks
+ * - array contains all the blocks that are attempted to move
+ */
+ void notifyStorageMovementAttemptFinishedBlks(
+ BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index aafdc65..9ba8af7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -132,13 +132,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
public void init(final Context context, final FileIdCollector fileIDCollector,
- final BlockMoveTaskHandler blockMovementTaskHandler) {
+ final BlockMoveTaskHandler blockMovementTaskHandler,
+ final BlockMovementListener blockMovementListener) {
this.ctxt = context;
this.storageMovementNeeded =
new BlockStorageMovementNeeded(context, fileIDCollector);
this.storageMovementsMonitor =
new BlockStorageMovementAttemptedItems(this,
- storageMovementNeeded);
+ storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
@@ -291,6 +292,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ " back to retry queue as some of the blocks"
+ " are low redundant.");
}
+ itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case BLOCKS_FAILED_TO_MOVE:
@@ -410,15 +412,18 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
- } else {
- // none of the blocks found its eligible targets for satisfying the
- // storage policy.
+ } else
+ if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+ // Check if the previous block was successfully paired. Here the
+ // status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
+ // blocks of a file found its eligible targets to satisfy the storage
+ // policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
- } else {
- if (hasLowRedundancyBlocks) {
- status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
- }
+ } else if (hasLowRedundancyBlocks
+ && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+ // Check if the previous block was successfully paired.
+ status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
@@ -426,8 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
- blockMoveTaskHandler.submitMoveTask(blkMovingInfo,
- storageMovementsMonitor);
+ blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
@@ -823,7 +827,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks.
*/
- public void handleStorageMovementAttemptFinishedBlks(
+ public void notifyStorageMovementAttemptFinishedBlks(
BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
return;
@@ -833,7 +837,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@VisibleForTesting
- BlockMovementListener getAttemptedItemsMonitor() {
+ BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
new file mode 100644
index 0000000..a1c8eec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.sps;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.balancer.KeyManager;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the external SPS block movements. This will move the
+ * given block to a target datanode by directly establishing socket connection
+ * to it and invokes function
+ * {@link Sender#replaceBlock(ExtendedBlock, StorageType, Token, String,
+ * DatanodeInfo, String)}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ExternalSPSBlockMoveTaskHandler.class);
+
+ private final ExecutorService moveExecutor;
+ private final CompletionService<BlockMovementAttemptFinished> mCompletionServ;
+ private final NameNodeConnector nnc;
+ private final SaslDataTransferClient saslClient;
+ private final BlockStorageMovementTracker blkMovementTracker;
+ private Daemon movementTrackerThread;
+ private final SPSService service;
+ private final BlockDispatcher blkDispatcher;
+
+ public ExternalSPSBlockMoveTaskHandler(Configuration conf,
+ NameNodeConnector nnc, SPSService spsService) {
+ int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+ moveExecutor = initializeBlockMoverThreadPool(moverThreads);
+ mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
+ this.nnc = nnc;
+ this.saslClient = new SaslDataTransferClient(conf,
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ nnc.getFallbackToSimpleAuth());
+ this.blkMovementTracker = new BlockStorageMovementTracker(
+ mCompletionServ, new ExternalBlocksMovementsStatusHandler());
+ this.service = spsService;
+
+ boolean connectToDnViaHostname = conf.getBoolean(
+ HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
+ HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+ blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
+ ioFileBufferSize, connectToDnViaHostname);
+ }
+
+ /**
+ * Initializes block movement tracker daemon and starts the thread.
+ */
+ void init() {
+ movementTrackerThread = new Daemon(this.blkMovementTracker);
+ movementTrackerThread.setName("BlockStorageMovementTracker");
+ movementTrackerThread.start();
+ }
+
+ private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
+ LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
+
+ ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution for block movement to satisfy storage policy"
+ + " got rejected, Executing in current thread");
+ // will run in the current thread.
+ super.rejectedExecution(runnable, e);
+ }
+ });
+
+ moverThreadPool.allowCoreThreadTimeOut(true);
+ return moverThreadPool;
+ }
+
+ @Override
+ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
+ // TODO: Need to increment scheduled block size on the target node. This
+ // count will be used to calculate the remaining space of target datanode
+ // during block movement assignment logic. In the internal movement,
+ // remaining space is bookkeeping at the DatanodeDescriptor, please refer
+ // IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
+ // updating via the funcation call -
+ // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+ LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
+ BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
+ Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
+ .submit(blockMovingTask);
+ blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+ }
+
+ private class ExternalBlocksMovementsStatusHandler
+ extends BlocksMovementsStatusHandler {
+ @Override
+ public void handle(
+ List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+ List<Block> blocks = new ArrayList<>();
+ for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+ blocks.add(item.getBlock());
+ }
+ BlocksStorageMoveAttemptFinished blkAttempted =
+ new BlocksStorageMoveAttemptFinished(
+ blocks.toArray(new Block[blocks.size()]));
+ service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+ }
+ }
+
+ /**
+ * This class encapsulates the process of moving the block replica to the
+ * given target.
+ */
+ private class BlockMovingTask
+ implements Callable<BlockMovementAttemptFinished> {
+ private final BlockMovingInfo blkMovingInfo;
+
+ BlockMovingTask(BlockMovingInfo blkMovingInfo) {
+ this.blkMovingInfo = blkMovingInfo;
+ }
+
+ @Override
+ public BlockMovementAttemptFinished call() {
+ BlockMovementStatus blkMovementStatus = moveBlock();
+ return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
+ blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+ blkMovementStatus);
+ }
+
+ private BlockMovementStatus moveBlock() {
+ ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
+ blkMovingInfo.getBlock());
+
+ final KeyManager km = nnc.getKeyManager();
+ Token<BlockTokenIdentifier> accessToken;
+ try {
+ accessToken = km.getAccessToken(eb,
+ new StorageType[]{blkMovingInfo.getTargetStorageType()},
+ new String[0]);
+ } catch (IOException e) {
+ // TODO: handle failure retries
+ LOG.warn(
+ "Failed to move block:{} from src:{} to destin:{} to satisfy "
+ + "storageType:{}",
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
+ }
+ return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
+ new Socket(), km, accessToken);
+ }
+ }
+
+ /**
+ * Cleanup the resources.
+ */
+ void cleanUp() {
+ blkMovementTracker.stopTracking();
+ if (movementTrackerThread != null) {
+ movementTrackerThread.interrupt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 3e2c324..4097339 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -56,7 +56,7 @@ public class TestBlockStorageMovementAttemptedItems {
unsatisfiedStorageMovementFiles =
new BlockStorageMovementNeeded(ctxt, null);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
- unsatisfiedStorageMovementFiles);
+ unsatisfiedStorageMovementFiles, null);
}
@After
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index e0bf410..8115661 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -92,7 +92,7 @@ public class TestStoragePolicySatisfier {
private static final String ONE_SSD = "ONE_SSD";
private static final String COLD = "COLD";
- private static final Logger LOG =
+ protected static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
private Configuration config = null;
private StorageType[][] allDiskTypes =
@@ -1337,7 +1337,7 @@ public class TestStoragePolicySatisfier {
};
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
- sps.init(ctxt, fileIDCollector, null);
+ sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");
@@ -1404,7 +1404,7 @@ public class TestStoragePolicySatisfier {
}
};
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
- sps.init(ctxt, fileIDCollector, null);
+ sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1db3fb75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 3ced34e..9a401bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -18,20 +18,33 @@
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.junit.Assert;
import org.junit.Ignore;
+import com.google.common.collect.Maps;
+
/**
* Tests the external sps service plugins.
*/
@@ -69,23 +82,24 @@ public class TestExternalStoragePolicySatisfier
cluster.waitActive();
if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
false)) {
- SPSService spsService = cluster.getNameNode().getNamesystem()
- .getBlockManager().getSPSService();
+ BlockManager blkMgr = cluster.getNameNode().getNamesystem()
+ .getBlockManager();
+ SPSService spsService = blkMgr.getSPSService();
spsService.stopGracefully();
IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
cluster.getNameNode().getNamesystem(),
- cluster.getNameNode().getNamesystem().getBlockManager(), cluster
- .getNameNode().getNamesystem().getBlockManager().getSPSService());
-
+ blkMgr, blkMgr.getSPSService());
+ ExternalBlockMovementListener blkMoveListener =
+ new ExternalBlockMovementListener();
+ ExternalSPSBlockMoveTaskHandler externalHandler =
+ new ExternalSPSBlockMoveTaskHandler(conf, getNameNodeConnector(conf),
+ blkMgr.getSPSService());
+ externalHandler.init();
spsService.init(context,
- new ExternalSPSFileIDCollector(context,
- cluster.getNameNode().getNamesystem().getBlockManager()
- .getSPSService(),
- 5),
- new IntraSPSNameNodeBlockMoveTaskHandler(
- cluster.getNameNode().getNamesystem().getBlockManager(),
- cluster.getNameNode().getNamesystem()));
+ new ExternalSPSFileIDCollector(context, blkMgr.getSPSService(), 5),
+ externalHandler,
+ blkMoveListener);
spsService.start(true);
}
return cluster;
@@ -97,6 +111,35 @@ public class TestExternalStoragePolicySatisfier
return new ExternalSPSFileIDCollector(ctxt, sps, 5);
}
+ private class ExternalBlockMovementListener implements BlockMovementListener {
+
+ private List<Block> actualBlockMovements = new ArrayList<>();
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ for (Block block : moveAttemptFinishedBlks) {
+ actualBlockMovements.add(block);
+ }
+ LOG.info("Movement attempted blocks", actualBlockMovements);
+ }
+ }
+
+ private NameNodeConnector getNameNodeConnector(Configuration conf)
+ throws IOException {
+ final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+ Assert.assertEquals(1, namenodes.size());
+ Map<URI, List<Path>> nnMap = Maps.newHashMap();
+ for (URI nn : namenodes) {
+ nnMap.put(nn, null);
+ }
+ final Path externalSPSPathId = new Path("/system/externalSPS.id");
+ final List<NameNodeConnector> nncs = NameNodeConnector
+ .newNameNodeConnectors(nnMap,
+ StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
+ conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+ return nncs.get(0);
+ }
+
/**
* This test need not run as external scan is not a batch based scanning right
* now.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org