You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:13 UTC
[45/50] [abbrv] hadoop git commit: HDFS-11029. [SPS]:Provide retry
mechanism for the blocks which were failed while moving its storage at DNs.
Contributed by Uma Maheswara Rao G
HDFS-11029. [SPS]:Provide retry mechanism for the blocks which were failed while moving its storage at DNs. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4eee3a18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4eee3a18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4eee3a18
Branch: refs/heads/HDFS-10285
Commit: 4eee3a18e30b5a418efbfe9345651c3ab05026fb
Parents: 8152114
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Thu Nov 10 10:09:45 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:25 2016 +0530
----------------------------------------------------------------------
.../BlockStorageMovementAttemptedItems.java | 221 +++++++++++++++++++
.../server/namenode/StoragePolicySatisfier.java | 24 +-
.../TestBlockStorageMovementAttemptedItems.java | 101 +++++++++
.../namenode/TestStoragePolicySatisfier.java | 8 +-
4 files changed, 343 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eee3a18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..580d0d6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A monitor class for checking whether block storage movements finished or not.
+ * If block storage movement results from datanode indicates about the movement
+ * success, then it will just remove the entries from tracking. If it reports
+ * failure, then it will add back to needed block storage movements list. If no
+ * DN reports about movement for longer time, then such items will be retries
+ * automatically after timeout. The default timeout would be 30mins.
+ */
+public class BlockStorageMovementAttemptedItems {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
+ // A map holds the items which are already taken for blocks movements
+ // processing and sent to DNs.
+ private final Map<Long, Long> storageMovementAttemptedItems;
+ private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
+ private volatile boolean spsRunning = true;
+ private Daemon timerThread = null;
+ //
+ // It might take anywhere between 30 to 60 minutes before
+ // a request is timed out.
+ //
+ private long selfRetryTimeout = 30 * 60 * 1000;
+
+ //
+ // It might take anywhere between 5 to 10 minutes before
+ // a request is timed out.
+ //
+ private long checkTimeout = 5 * 60 * 1000; // minimum value
+ private BlockStorageMovementNeeded blockStorageMovementNeeded;
+
+ public BlockStorageMovementAttemptedItems(long timeoutPeriod,
+ long selfRetryTimeout,
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+ if (timeoutPeriod > 0) {
+ this.checkTimeout = Math.min(checkTimeout, timeoutPeriod);
+ }
+
+ this.selfRetryTimeout = selfRetryTimeout;
+ this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
+ storageMovementAttemptedItems = new HashMap<>();
+ storageMovementAttemptedResults = new ArrayList<>();
+ }
+
+ /**
+ * Add item to block storage movement attempted items map which holds the
+ * tracking/blockCollection id versus time stamp.
+ *
+ * @param blockCollectionID
+ * - tracking id / block collection id
+ */
+ public void add(Long blockCollectionID) {
+ synchronized (storageMovementAttemptedItems) {
+ storageMovementAttemptedItems.put(blockCollectionID, monotonicNow());
+ }
+ }
+
+ /**
+ * Add the trackIDBlocksStorageMovementResults to
+ * storageMovementAttemptedResults.
+ *
+ * @param blksMovementResults
+ */
+ public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
+ if (blksMovementResults.length == 0) {
+ return;
+ }
+ synchronized (storageMovementAttemptedResults) {
+ storageMovementAttemptedResults
+ .addAll(Arrays.asList(blksMovementResults));
+ }
+ }
+
+ /**
+ * Starts the monitor thread.
+ */
+ void start() {
+ timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
+ timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
+ timerThread.start();
+ }
+
+ /**
+ * Stops the monitor thread.
+ */
+ public void stop() {
+ spsRunning = false;
+ }
+
+ /**
+ * A monitor class for checking block storage movement result and long waiting
+ * items periodically.
+ */
+ private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
+ @Override
+ public void run() {
+ while (spsRunning) {
+ try {
+ blockStorageMovementResultCheck();
+ blocksStorageMovementUnReportedItemsCheck();
+ Thread.sleep(checkTimeout);
+ } catch (InterruptedException ie) {
+ LOG.debug("BlocksStorageMovementAttemptResultMonitor thread "
+ + "is interrupted.", ie);
+ }
+ }
+ }
+
+ private void blocksStorageMovementUnReportedItemsCheck() {
+ synchronized (storageMovementAttemptedItems) {
+ Iterator<Entry<Long, Long>> iter =
+ storageMovementAttemptedItems.entrySet().iterator();
+ long now = monotonicNow();
+ while (iter.hasNext()) {
+ Entry<Long, Long> entry = iter.next();
+ if (now > entry.getValue() + selfRetryTimeout) {
+ Long blockCollectionID = entry.getKey();
+ synchronized (storageMovementAttemptedResults) {
+ boolean exist = isExistInResult(blockCollectionID);
+ if (!exist) {
+ blockStorageMovementNeeded.add(blockCollectionID);
+ } else {
+ LOG.info("Blocks storage movement results for the"
+ + " tracking id : " + blockCollectionID
+ + " is reported from one of the co-ordinating datanode."
+ + " So, the result will be processed soon.");
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ }
+ }
+
+ private boolean isExistInResult(Long blockCollectionID) {
+ Iterator<BlocksStorageMovementResult> iter =
+ storageMovementAttemptedResults.iterator();
+ while (iter.hasNext()) {
+ BlocksStorageMovementResult storageMovementAttemptedResult =
+ iter.next();
+ if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void blockStorageMovementResultCheck() {
+ synchronized (storageMovementAttemptedResults) {
+ Iterator<BlocksStorageMovementResult> iter =
+ storageMovementAttemptedResults.iterator();
+ while (iter.hasNext()) {
+ BlocksStorageMovementResult storageMovementAttemptedResult =
+ iter.next();
+ if (storageMovementAttemptedResult
+ .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
+ blockStorageMovementNeeded
+ .add(storageMovementAttemptedResult.getTrackId());
+ LOG.warn("Blocks storage movement results for the tracking id : "
+ + storageMovementAttemptedResult.getTrackId()
+ + " is reported from co-ordinating datanode, but result"
+ + " status is FAILURE. So, added for retry");
+ } else {
+ synchronized (storageMovementAttemptedItems) {
+ storageMovementAttemptedItems
+ .remove(storageMovementAttemptedResult.getTrackId());
+ }
+ LOG.info("Blocks storage movement results for the tracking id : "
+ + storageMovementAttemptedResult.getTrackId()
+ + " is reported from co-ordinating datanode. "
+ + "The result status is SUCCESS.");
+ }
+ iter.remove(); // remove from results as processed above
+ }
+ }
+
+ }
+ }
+
+ @VisibleForTesting
+ public int resultsCount() {
+ return storageMovementAttemptedResults.size();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eee3a18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index fbe686a..6fa9302 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -69,6 +69,7 @@ public class StoragePolicySatisfier implements Runnable {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final BlockStorageMovementNeeded storageMovementNeeded;
+ private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
public StoragePolicySatisfier(final Namesystem namesystem,
final BlockStorageMovementNeeded storageMovementNeeded,
@@ -76,15 +77,22 @@ public class StoragePolicySatisfier implements Runnable {
this.namesystem = namesystem;
this.storageMovementNeeded = storageMovementNeeded;
this.blockManager = blkManager;
+ // TODO: below selfRetryTimeout and checkTimeout can be configurable later
+ // Now, the default values of selfRetryTimeout and checkTimeout are 30mins
+ // and 5mins respectively
+ this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
+ 5 * 60 * 1000, 30 * 60 * 1000, storageMovementNeeded);
}
/**
- * Start storage policy satisfier demon thread.
+ * Start storage policy satisfier demon thread. Also start block storage
+ * movements monitor for retry the attempts if needed.
*/
public void start() {
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
+ this.storageMovementsMonitor.start();
}
/**
@@ -99,6 +107,7 @@ public class StoragePolicySatisfier implements Runnable {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
}
+ this.storageMovementsMonitor.stop();
}
@Override
@@ -108,6 +117,7 @@ public class StoragePolicySatisfier implements Runnable {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
+ this.storageMovementsMonitor.add(blockCollectionID);
}
// TODO: We can think to make this as configurable later, how frequently
// we want to check block movements.
@@ -398,11 +408,6 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- // TODO: Temporarily keeping the results for assertion. This has to be
- // revisited as part of HDFS-11029.
- @VisibleForTesting
- List<BlocksStorageMovementResult> results = new ArrayList<>();
-
/**
* Receives the movement results of collection of blocks associated to a
* trackId.
@@ -415,6 +420,11 @@ public class StoragePolicySatisfier implements Runnable {
if (blksMovementResults.length <= 0) {
return;
}
- results.addAll(Arrays.asList(blksMovementResults));
+ storageMovementsMonitor.addResults(blksMovementResults);
+ }
+
+ @VisibleForTesting
+ BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+ return storageMovementsMonitor;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eee3a18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..8c70d99
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that block storage movement attempt failures are reported from DN and
+ * processed them correctly or not.
+ */
+public class TestBlockStorageMovementAttemptedItems {
+
+ private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
+ private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+
+ @Before
+ public void setup() {
+ unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500,
+ unsatisfiedStorageMovementFiles);
+ bsmAttemptedItems.start();
+ }
+
+ @After
+ public void teardown() {
+ if (bsmAttemptedItems != null) {
+ bsmAttemptedItems.stop();
+ }
+ }
+
+ private boolean checkItemMovedForRetry(Long item, long retryTimeout)
+ throws InterruptedException {
+ long stopTime = monotonicNow() + (retryTimeout * 2);
+ boolean isItemFound = false;
+ while (monotonicNow() < (stopTime)) {
+ Long ele = null;
+ while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
+ if (item.longValue() == ele.longValue()) {
+ isItemFound = true;
+ break;
+ }
+ }
+ if (!isItemFound) {
+ Thread.sleep(100);
+ } else {
+ break;
+ }
+ }
+ return isItemFound;
+ }
+
+ @Test(timeout = 30000)
+ public void testAddResultWithFailureResult() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+ assertTrue(checkItemMovedForRetry(item, 200));
+ }
+
+ @Test(timeout = 30000)
+ public void testAddResultWithSucessResult() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+ assertFalse(checkItemMovedForRetry(item, 200));
+ }
+
+ @Test(timeout = 30000)
+ public void testNoResultAdded() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item);
+ // After selfretry timeout, it should be added back for retry
+ assertTrue(checkItemMovedForRetry(item, 600));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eee3a18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index cbfdfc6..6f5c717 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -174,8 +174,6 @@ public class TestStoragePolicySatisfier {
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
- // TODO: Temporarily using the results from StoragePolicySatisfier class.
- // This has to be revisited as part of HDFS-11029.
waitForBlocksMovementResult(1, 30000);
} finally {
hdfsCluster.shutdown();
@@ -190,8 +188,10 @@ public class TestStoragePolicySatisfier {
@Override
public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}",
- expectedResultsCount, sps.results.size());
- return expectedResultsCount == sps.results.size();
+ expectedResultsCount,
+ sps.getAttemptedItemsMonitor().resultsCount());
+ return expectedResultsCount == sps.getAttemptedItemsMonitor()
+ .resultsCount();
}
}, 100, timeout);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org