You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:16:03 UTC

[36/50] [abbrv] hadoop git commit: HDFS-11572. [SPS]: SPS should clean Xattrs when no blocks required to satisfy for a file. Contributed by Uma Maheswara Rao G

HDFS-11572. [SPS]: SPS should clean Xattrs when no blocks required to satisfy for a file. Contributed by Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b03ec6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b03ec6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b03ec6e

Branch: refs/heads/HDFS-10285
Commit: 0b03ec6ee7e6afe2776e99d5e6dc2a00bc72cff4
Parents: ade0d04
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Thu Apr 20 23:14:36 2017 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 11:55:41 2017 +0530

----------------------------------------------------------------------
 .../BlockStorageMovementAttemptedItems.java     |   2 +-
 .../server/namenode/StoragePolicySatisfier.java | 116 ++++++++++++++-----
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  35 ++++++
 .../TestPersistentStoragePolicySatisfier.java   |  52 +++++----
 .../namenode/TestStoragePolicySatisfier.java    |  76 ++++++++++++
 5 files changed, 225 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b03ec6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index f2406da..bf7859c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -333,7 +333,7 @@ public class BlockStorageMovementAttemptedItems {
                   + "doesn't exists in storageMovementAttemptedItems list",
                   storageMovementAttemptedResult.getTrackId());
               // Remove xattr for the track id.
-              this.sps.notifyBlkStorageMovementFinished(
+              this.sps.postBlkStorageMovementCleanup(
                   storageMovementAttemptedResult.getTrackId());
             }
             break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b03ec6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 8be0a2a..3b20314 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -79,6 +79,27 @@ public class StoragePolicySatisfier implements Runnable {
   private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
 
+  /**
+   * Represents the collective analysis status for all blocks.
+   */
+  private enum BlocksMovingAnalysisStatus {
+    // Represents that, the analysis skipped due to some conditions. A such
+    // condition is if block collection is in incomplete state.
+    ANALYSIS_SKIPPED_FOR_RETRY,
+    // Represents that, all block storage movement needed blocks found its
+    // targets.
+    ALL_BLOCKS_TARGETS_PAIRED,
+    // Represents that, only fewer or none of the block storage movement needed
+    // block found its eligible targets.
+    FEW_BLOCKS_TARGETS_PAIRED,
+    // Represents that, none of the blocks found for block storage movements.
+    BLOCKS_ALREADY_SATISFIED,
+    // Represents that, the analysis skipped due to some conditions.
+    // Example conditions are if no blocks really exists in block collection or
+    // if analysis is not required on ec files with unsuitable storage policies
+    BLOCKS_TARGET_PAIRING_SKIPPED;
+  }
+
   public StoragePolicySatisfier(final Namesystem namesystem,
       final BlockStorageMovementNeeded storageMovementNeeded,
       final BlockManager blkManager, Configuration conf) {
@@ -208,10 +229,31 @@ public class StoragePolicySatisfier implements Runnable {
                 namesystem.getBlockCollection(blockCollectionID);
             // Check blockCollectionId existence.
             if (blockCollection != null) {
-              boolean allBlockLocsAttemptedToSatisfy =
-                  computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
-              this.storageMovementsMonitor
-                  .add(blockCollectionID, allBlockLocsAttemptedToSatisfy);
+              BlocksMovingAnalysisStatus status =
+                  analyseBlocksStorageMovementsAndAssignToDN(blockCollection);
+              switch (status) {
+              // Just add to monitor, so it will be retried after timeout
+              case ANALYSIS_SKIPPED_FOR_RETRY:
+                // Just add to monitor, so it will be tracked for result and
+                // be removed on successful storage movement result.
+              case ALL_BLOCKS_TARGETS_PAIRED:
+                this.storageMovementsMonitor.add(blockCollectionID, true);
+                break;
+              // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
+              // that it will be tracked and still it will be consider for retry
+              // as analysis was not found targets for storage movement blocks.
+              case FEW_BLOCKS_TARGETS_PAIRED:
+                this.storageMovementsMonitor.add(blockCollectionID, false);
+                break;
+              // Just clean Xattrs
+              case BLOCKS_TARGET_PAIRING_SKIPPED:
+              case BLOCKS_ALREADY_SATISFIED:
+              default:
+                LOG.info("Block analysis skipped or blocks already satisfied"
+                    + " with storages. So, Cleaning up the Xattrs.");
+                postBlkStorageMovementCleanup(blockCollectionID);
+                break;
+              }
             }
           }
         }
@@ -235,15 +277,15 @@ public class StoragePolicySatisfier implements Runnable {
         }
         LOG.error("StoragePolicySatisfier thread received runtime exception. "
             + "Stopping Storage policy satisfier work", t);
-        // TODO: Just break for now. Once we implement dynamic start/stop
-        // option, we can add conditions here when to break/terminate.
         break;
       }
     }
   }
 
-  private boolean computeAndAssignStorageMismatchedBlocksToDNs(
+  private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
       BlockCollection blockCollection) {
+    BlocksMovingAnalysisStatus status =
+        BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED;
     byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
     BlockStoragePolicy existingStoragePolicy =
         blockManager.getStoragePolicy(existingStoragePolicyID);
@@ -252,21 +294,20 @@ public class StoragePolicySatisfier implements Runnable {
       // So, should we add back? or leave it to user
       LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
           + " this to the next retry iteration", blockCollection.getId());
-      return true;
+      return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY;
     }
 
     // First datanode will be chosen as the co-ordinator node for storage
     // movements. Later this can be optimized if needed.
     DatanodeDescriptor coordinatorNode = null;
     BlockInfo[] blocks = blockCollection.getBlocks();
+    if (blocks.length == 0) {
+      LOG.info("BlockCollectionID: {} file is not having any blocks."
+          + " So, skipping the analysis.", blockCollection.getId());
+      return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
+    }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
 
-    // True value represents that, SPS is able to find matching target nodes
-    // to satisfy storage type for all the blocks locations of the given
-    // blockCollection. A false value represents that, blockCollection needed
-    // retries to satisfy the storage policy for some of the block locations.
-    boolean foundMatchingTargetNodesForAllBlocks = true;
-
     for (int i = 0; i < blocks.length; i++) {
       BlockInfo blockInfo = blocks[i];
       List<StorageType> expectedStorageTypes;
@@ -283,19 +324,38 @@ public class StoragePolicySatisfier implements Runnable {
           LOG.warn("The storage policy " + existingStoragePolicy.getName()
               + " is not suitable for Striped EC files. "
               + "So, ignoring to move the blocks");
-          return false;
+          return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
             .chooseStorageTypes(blockInfo.getReplication());
       }
-      foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
-          blockMovingInfos, blockInfo, expectedStorageTypes);
+
+      DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+      StorageType[] storageTypes = new StorageType[storages.length];
+      for (int j = 0; j < storages.length; j++) {
+        DatanodeStorageInfo datanodeStorageInfo = storages[j];
+        StorageType storageType = datanodeStorageInfo.getStorageType();
+        storageTypes[j] = storageType;
+      }
+      List<StorageType> existing =
+          new LinkedList<StorageType>(Arrays.asList(storageTypes));
+      if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+          existing, true)) {
+        boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
+            blockInfo, expectedStorageTypes, existing, storages);
+        if (computeStatus
+            && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) {
+          status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
+        } else {
+          status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
+        }
+      }
     }
 
     assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
         blockMovingInfos, coordinatorNode);
-    return foundMatchingTargetNodesForAllBlocks;
+    return status;
   }
 
   /**
@@ -311,22 +371,18 @@ public class StoragePolicySatisfier implements Runnable {
    *          - block details
    * @param expectedStorageTypes
    *          - list of expected storage type to satisfy the storage policy
+   * @param existing
+   *          - list to get existing storage types
+   * @param storages
+   *          - available storages
    * @return false if some of the block locations failed to find target node to
    *         satisfy the storage policy, true otherwise
    */
   private boolean computeBlockMovingInfos(
       List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
-      List<StorageType> expectedStorageTypes) {
+      List<StorageType> expectedStorageTypes, List<StorageType> existing,
+      DatanodeStorageInfo[] storages) {
     boolean foundMatchingTargetNodesForBlock = true;
-    DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
-    StorageType[] storageTypes = new StorageType[storages.length];
-    for (int j = 0; j < storages.length; j++) {
-      DatanodeStorageInfo datanodeStorageInfo = storages[j];
-      StorageType storageType = datanodeStorageInfo.getStorageType();
-      storageTypes[j] = storageType;
-    }
-    List<StorageType> existing =
-        new LinkedList<StorageType>(Arrays.asList(storageTypes));
     if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
         existing, true)) {
       List<StorageTypeNodePair> sourceWithStorageMap =
@@ -756,7 +812,7 @@ public class StoragePolicySatisfier implements Runnable {
     Long id;
     while ((id = storageMovementNeeded.get()) != null) {
       try {
-        notifyBlkStorageMovementFinished(id);
+        postBlkStorageMovementCleanup(id);
       } catch (IOException ie) {
         LOG.warn("Failed to remove SPS "
             + "xattr for collection id " + id, ie);
@@ -771,7 +827,7 @@ public class StoragePolicySatisfier implements Runnable {
    * @param trackId track id i.e., block collection id.
    * @throws IOException
    */
-  public void notifyBlkStorageMovementFinished(long trackId)
+  public void postBlkStorageMovementCleanup(long trackId)
       throws IOException {
     this.namesystem.getFSDirectory().removeSPSXattr(trackId);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b03ec6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index aea4dac..93b7169 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -99,8 +99,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -150,9 +153,12 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.XAttrStorage;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -168,6 +174,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -2322,4 +2329,32 @@ public class DFSTestUtil {
       }
     }, 500, timeout);
   }
+
+  /**
+   * Waits for removal of a specified Xattr on a specified file.
+   *
+   * @param srcPath
+   *          file name.
+   * @param xattr
+   *          name of the extended attribute.
+   * @param ns
+   *          Namesystem
+   * @param timeout
+   *          max wait time
+   * @throws Exception
+   */
+  public static void waitForXattrRemoved(String srcPath, String xattr,
+      Namesystem ns, int timeout) throws TimeoutException, InterruptedException,
+          UnresolvedLinkException, AccessControlException,
+          ParentNotDirectoryException {
+    final INode inode = ns.getFSDirectory().getINode(srcPath);
+    final XAttr satisfyXAttr = XAttrHelper.buildXAttr(xattr);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+        return !existingXAttrs.contains(satisfyXAttr);
+      }
+    }, 100, timeout);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b03ec6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index 8c3359a..41c272c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -20,22 +20,18 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.List;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-import static org.junit.Assert.assertFalse;
 
 /**
  * Test persistence of satisfying files/directories.
@@ -341,15 +337,9 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           testFileName, StorageType.DISK, 2, timeout, fs);
 
-      // Make sure that SPS xattr has been removed.
-      int retryTime = 0;
-      while (retryTime < 30) {
-        if (!fileContainsSPSXAttr(testFile)) {
-          break;
-        }
-        Thread.sleep(minCheckTimeout);
-        retryTime += 1;
-      }
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved(testFileName,
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
 
       fs.setStoragePolicy(testFile, COLD);
       fs.satisfyStoragePolicy(testFile);
@@ -379,7 +369,8 @@ public class TestPersistentStoragePolicySatisfier {
       cluster.getNamesystem().getBlockManager().deactivateSPS();
 
       // Make sure satisfy xattr has been removed.
-      assertFalse(fileContainsSPSXAttr(testFile));
+      DFSTestUtil.waitForXattrRemoved(testFileName,
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
 
     } finally {
       clusterShutdown();
@@ -387,18 +378,29 @@ public class TestPersistentStoragePolicySatisfier {
   }
 
   /**
-   * Check whether file contains SPS xattr.
-   * @param fileName file name.
-   * @return true if file contains SPS xattr.
-   * @throws IOException
+   * Tests that Xattrs should be cleaned if all blocks already satisfied.
+   *
+   * @throws Exception
    */
-  private boolean fileContainsSPSXAttr(Path fileName) throws IOException {
-    final INode inode = cluster.getNamesystem()
-        .getFSDirectory().getINode(fileName.toString());
-    final XAttr satisfyXAttr =
-        XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
-    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
-    return existingXAttrs.contains(satisfyXAttr);
+  @Test(timeout = 300000)
+  public void testSPSShouldNotLeakXattrIfStorageAlreadySatisfied()
+      throws Exception {
+    try {
+      clusterSetUp();
+      DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
+          timeout, fs);
+      fs.satisfyStoragePolicy(testFile);
+
+      DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
+          timeout, fs);
+
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved(testFileName,
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+
+    } finally {
+      clusterShutdown();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b03ec6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 2a33455..8457e5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.junit.Assert.assertNull;
 
 import java.io.FileNotFoundException;
@@ -34,13 +35,17 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -828,6 +833,77 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests that Xattrs should be cleaned if satisfy storage policy called on EC
+   * file with unsuitable storage policy set.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
+      throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD}};
+
+    int defaultStripedBlockSize =
+        ErasureCodingPolicyManager.getSystemPolicies()[0].getCellSize() * 4;
+    config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
+    config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1L);
+    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
+        false);
+
+    try {
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+
+      // set "/foo" directory with ONE_SSD storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(config,
+          hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class)
+          .getProxy();
+      String fooDir = "/foo";
+      client.mkdirs(fooDir, new FsPermission((short) 777), true);
+      // set an EC policy on "/foo" directory
+      client.setErasureCodingPolicy(fooDir, null);
+
+      // write file to fooDir
+      final String testFile = "/foo/bar";
+      long fileLen = 20 * defaultStripedBlockSize;
+      dfs = hdfsCluster.getFileSystem();
+      DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0);
+
+      // ONESSD is unsuitable storage policy on EC files
+      client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+      dfs.satisfyStoragePolicy(new Path(testFile));
+
+      // Thread.sleep(9000); // To make sure SPS triggered
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks =
+          client.getBlockLocations(testFile, 0, fileLen);
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        for (StorageType type : lb.getStorageTypes()) {
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY,
+          hdfsCluster.getNamesystem(), 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org