You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/02/16 14:39:55 UTC
[04/50] [abbrv] hadoop git commit: HDFS-11336: [SPS]: Remove xAttrs
when movements done or SPS disabled. Contributed by Yuanbo Liu.
HDFS-11336: [SPS]: Remove xAttrs when movements done or SPS disabled. Contributed by Yuanbo Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21760eb1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21760eb1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21760eb1
Branch: refs/heads/HDFS-10285
Commit: 21760eb1cbaa95efe12be3d460164fa08a43bb5f
Parents: b2563ef
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Mar 14 00:52:24 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Fri Feb 16 19:46:20 2018 +0530
----------------------------------------------------------------------
.../BlockStorageMovementAttemptedItems.java | 14 ++-
.../hdfs/server/namenode/FSDirAttrOp.java | 8 ++
.../hdfs/server/namenode/FSDirectory.java | 16 +++
.../server/namenode/StoragePolicySatisfier.java | 45 ++++++--
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
.../TestBlockStorageMovementAttemptedItems.java | 6 +-
.../TestPersistentStoragePolicySatisfier.java | 112 ++++++++++++++++++-
7 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 042aca3..f15db73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -54,6 +55,7 @@ public class BlockStorageMovementAttemptedItems {
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
+ private final StoragePolicySatisfier sps;
//
// It might take anywhere between 30 to 60 minutes before
// a request is timed out.
@@ -69,7 +71,8 @@ public class BlockStorageMovementAttemptedItems {
public BlockStorageMovementAttemptedItems(long recheckTimeout,
long selfRetryTimeout,
- BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+ StoragePolicySatisfier sps) {
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
@@ -78,6 +81,7 @@ public class BlockStorageMovementAttemptedItems {
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new HashMap<>();
storageMovementAttemptedResults = new ArrayList<>();
+ this.sps = sps;
}
/**
@@ -200,6 +204,9 @@ public class BlockStorageMovementAttemptedItems {
} catch (InterruptedException ie) {
LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
+ "is interrupted.", ie);
+ } catch (IOException ie) {
+ LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
+ + "received exception and exiting.", ie);
}
}
}
@@ -248,7 +255,7 @@ public class BlockStorageMovementAttemptedItems {
}
@VisibleForTesting
- void blockStorageMovementResultCheck() {
+ void blockStorageMovementResultCheck() throws IOException {
synchronized (storageMovementAttemptedResults) {
Iterator<BlocksStorageMovementResult> resultsIter =
storageMovementAttemptedResults.iterator();
@@ -296,6 +303,9 @@ public class BlockStorageMovementAttemptedItems {
+ " reported from co-ordinating datanode. But the trackID "
+ "doesn't exists in storageMovementAttemptedItems list",
storageMovementAttemptedResult.getTrackId());
+ // Remove xattr for the track id.
+ this.sps.notifyBlkStorageMovementFinished(
+ storageMovementAttemptedResult.getTrackId());
}
}
// Remove trackID from the attempted list, if any.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 518c17e..66d5f3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -544,6 +544,14 @@ public class FSDirAttrOp {
return false;
}
+ static void unprotectedRemoveSPSXAttr(INode inode, XAttr spsXAttr)
+ throws IOException{
+ List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+ existingXAttrs.remove(spsXAttr);
+ XAttrStorage.updateINodeXAttrs(inode, existingXAttrs,
+ INodesInPath.fromINode(inode).getLatestSnapshotId());
+ }
+
private static void setDirStoragePolicy(
FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
INode inode = FSDirectory.resolveLastINode(iip);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c286ec1..f6f2001 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1423,6 +1423,22 @@ public class FSDirectory implements Closeable {
getBlockManager().satisfyStoragePolicy(inode.getId());
}
+ /**
+ * Remove the SPS xattr from the inode, retrieve the inode from the
+ * block collection id.
+ * @param id
+ * - file block collection id.
+ */
+ public void removeSPSXattr(long id) throws IOException {
+ final INode inode = getInode(id);
+ final XAttrFeature xaf = inode.getXAttrFeature();
+ final XAttr spsXAttr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
+
+ if (spsXAttr != null) {
+ FSDirAttrOp.unprotectedRemoveSPSXAttr(inode, spsXAttr);
+ }
+ }
+
private void addEncryptionZone(INodeWithAdditionalFields inode,
XAttrFeature xaf) {
if (xaf == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 29c8a5d..337d5b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -91,7 +92,8 @@ public class StoragePolicySatisfier implements Runnable {
conf.getLong(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
- storageMovementNeeded);
+ storageMovementNeeded,
+ this);
}
/**
@@ -119,12 +121,6 @@ public class StoragePolicySatisfier implements Runnable {
*/
public synchronized void stop(boolean reconfigStop) {
isRunning = false;
- if (reconfigStop) {
- LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
- + "deactivate it.");
- } else {
- LOG.info("Stopping StoragePolicySatisfier.");
- }
if (storagePolicySatisfierThread == null) {
return;
}
@@ -135,8 +131,12 @@ public class StoragePolicySatisfier implements Runnable {
}
this.storageMovementsMonitor.stop();
if (reconfigStop) {
- this.clearQueues();
+ LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+ + "deactivate it.");
+ this.clearQueuesWithNotification();
this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ } else {
+ LOG.info("Stopping StoragePolicySatisfier.");
}
}
@@ -717,4 +717,33 @@ public class StoragePolicySatisfier implements Runnable {
+ "user requests on satisfying block storages would be discarded.");
storageMovementNeeded.clearAll();
}
+
+ /**
+ * Clean all the movements in storageMovementNeeded and notify
+ * to clean up required resources.
+ * @throws IOException
+ */
+ private void clearQueuesWithNotification() {
+ Long id;
+ while ((id = storageMovementNeeded.get()) != null) {
+ try {
+ notifyBlkStorageMovementFinished(id);
+ } catch (IOException ie) {
+ LOG.warn("Failed to remove SPS "
+ + "xattr for collection id " + id, ie);
+ }
+ }
+ }
+
+ /**
+ * When block movement has been finished successfully, some additional
+ * operations should be notified, for example, SPS xattr should be
+ * removed.
+ * @param trackId track id i.e., block collection id.
+ * @throws IOException
+ */
+ public void notifyBlkStorageMovementFinished(long trackId)
+ throws IOException {
+ this.namesystem.getFSDirectory().removeSPSXattr(trackId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index d04b8e4..1cec9b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -2389,6 +2389,6 @@ public class DFSTestUtil {
+ expectedStorageCount + " and actual=" + actualStorageCount);
return expectedStorageCount == actualStorageCount;
}
- }, 1000, timeout);
+ }, 500, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 6641134..95142d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* Tests that block storage movement attempt failures are reported from DN and
@@ -36,10 +37,11 @@ public class TestBlockStorageMovementAttemptedItems {
private final int selfRetryTimeout = 500;
@Before
- public void setup() {
+ public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+ StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
- selfRetryTimeout, unsatisfiedStorageMovementFiles);
+ selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
}
@After
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21760eb1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e4b4290..8c3359a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static org.junit.Assert.assertFalse;
/**
* Test persistence of satisfying files/directories.
@@ -72,7 +78,16 @@ public class TestPersistentStoragePolicySatisfier {
* @throws IOException
*/
public void clusterSetUp() throws Exception {
- clusterSetUp(false);
+ clusterSetUp(false, new HdfsConfiguration());
+ }
+
+ /**
+ * Setup environment for every test case.
+ * @param hdfsConf hdfs conf.
+ * @throws Exception
+ */
+ public void clusterSetUp(Configuration hdfsConf) throws Exception {
+ clusterSetUp(false, hdfsConf);
}
/**
@@ -80,8 +95,9 @@ public class TestPersistentStoragePolicySatisfier {
* @param isHAEnabled if true, enable simple HA.
* @throws IOException
*/
- private void clusterSetUp(boolean isHAEnabled) throws Exception {
- conf = new HdfsConfiguration();
+ private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
+ throws Exception {
+ conf = newConf;
final int dnNumber = storageTypes.length;
final short replication = 3;
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
@@ -188,7 +204,7 @@ public class TestPersistentStoragePolicySatisfier {
public void testWithHA() throws Exception {
try {
// Enable HA env for testing.
- clusterSetUp(true);
+ clusterSetUp(true, new HdfsConfiguration());
fs.setStoragePolicy(testFile, ALL_SSD);
fs.satisfyStoragePolicy(testFile);
@@ -298,6 +314,94 @@ public class TestPersistentStoragePolicySatisfier {
}
/**
+ * Tests to verify SPS xattr will be removed if the satisfy work has
+ * been finished, expect that the method satisfyStoragePolicy can be
+ * invoked on the same file again after the block movement has been
+ * finished:
+ * 1. satisfy storage policy of file1.
+ * 2. wait until storage policy is satisfied.
+ * 3. satisfy storage policy of file1 again
+ * 4. make sure step 3 works as expected.
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testMultipleSatisfyStoragePolicy() throws Exception {
+ try {
+ // Lower block movement check for testing.
+ conf = new HdfsConfiguration();
+ final long minCheckTimeout = 500; // minimum value
+ conf.setLong(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+ minCheckTimeout);
+ clusterSetUp(conf);
+ fs.setStoragePolicy(testFile, ONE_SSD);
+ fs.satisfyStoragePolicy(testFile);
+ DFSTestUtil.waitExpectedStorageType(
+ testFileName, StorageType.SSD, 1, timeout, fs);
+ DFSTestUtil.waitExpectedStorageType(
+ testFileName, StorageType.DISK, 2, timeout, fs);
+
+ // Make sure that SPS xattr has been removed.
+ int retryTime = 0;
+ while (retryTime < 30) {
+ if (!fileContainsSPSXAttr(testFile)) {
+ break;
+ }
+ Thread.sleep(minCheckTimeout);
+ retryTime += 1;
+ }
+
+ fs.setStoragePolicy(testFile, COLD);
+ fs.satisfyStoragePolicy(testFile);
+ DFSTestUtil.waitExpectedStorageType(
+ testFileName, StorageType.ARCHIVE, 3, timeout, fs);
+ } finally {
+ clusterShutdown();
+ }
+ }
+
+ /**
+ * Tests to verify SPS xattr is removed after SPS is dropped,
+ * expect that if the SPS is disabled/dropped, the SPS
+ * xattr should be removed accordingly:
+ * 1. satisfy storage policy of file1.
+ * 2. drop SPS thread in block manager.
+ * 3. make sure sps xattr is removed.
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testDropSPS() throws Exception {
+ try {
+ clusterSetUp();
+ fs.setStoragePolicy(testFile, ONE_SSD);
+ fs.satisfyStoragePolicy(testFile);
+
+ cluster.getNamesystem().getBlockManager().deactivateSPS();
+
+ // Make sure satisfy xattr has been removed.
+ assertFalse(fileContainsSPSXAttr(testFile));
+
+ } finally {
+ clusterShutdown();
+ }
+ }
+
+ /**
+ * Check whether file contains SPS xattr.
+ * @param fileName file name.
+ * @return true if file contains SPS xattr.
+ * @throws IOException
+ */
+ private boolean fileContainsSPSXAttr(Path fileName) throws IOException {
+ final INode inode = cluster.getNamesystem()
+ .getFSDirectory().getINode(fileName.toString());
+ final XAttr satisfyXAttr =
+ XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
+ List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+ return existingXAttrs.contains(satisfyXAttr);
+ }
+
+ /**
* Restart the hole env and trigger the DataNode's heart beats.
* @throws Exception
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org