You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 20:26:48 UTC
[36/50] [abbrv] hadoop git commit: HDFS-13110: [SPS]: Reduce the
number of APIs in NamenodeProtocol used by external satisfier. Contributed by
Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
deleted file mode 100644
index ff277ba..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.hadoop.hdfs.server.sps;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.sps.Context;
-import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is to scan the paths recursively. If file is directory, then it
- * will scan for files recursively. If the file is non directory, then it will
- * just submit the same file to process.
- */
-@InterfaceAudience.Private
-public class ExternalSPSFileIDCollector implements FileIdCollector {
- public static final Logger LOG =
- LoggerFactory.getLogger(ExternalSPSFileIDCollector.class);
- private Context cxt;
- private DistributedFileSystem dfs;
- private SPSService service;
- private int maxQueueLimitToScan;
-
- public ExternalSPSFileIDCollector(Context cxt, SPSService service) {
- this.cxt = cxt;
- this.service = service;
- this.maxQueueLimitToScan = service.getConf().getInt(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
- try {
- // TODO: probably we could get this dfs from external context? but this is
- // too specific to external.
- dfs = getFS(service.getConf());
- } catch (IOException e) {
- LOG.error("Unable to get the filesystem. Make sure Namenode running and "
- + "configured namenode address is correct.", e);
- }
- }
-
- private DistributedFileSystem getFS(Configuration conf) throws IOException {
- return (DistributedFileSystem) FileSystem
- .get(FileSystem.getDefaultUri(conf), conf);
- }
-
- /**
- * Recursively scan the given path and add the file info to SPS service for
- * processing.
- */
- private long processPath(long startID, String fullPath) {
- long pendingWorkCount = 0; // to be satisfied file counter
- for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
- final DirectoryListing children;
- try {
- children = dfs.getClient().listPaths(fullPath, lastReturnedName, false);
- } catch (IOException e) {
- LOG.warn("Failed to list directory " + fullPath
- + ". Ignore the directory and continue.", e);
- return pendingWorkCount;
- }
- if (children == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The scanning start dir/sub dir " + fullPath
- + " does not have childrens.");
- }
- return pendingWorkCount;
- }
-
- for (HdfsFileStatus child : children.getPartialListing()) {
- if (child.isFile()) {
- service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()),
- false);
- checkProcessingQueuesFree();
- pendingWorkCount++; // increment to be satisfied file count
- } else {
- String fullPathStr = child.getFullName(fullPath);
- if (child.isDirectory()) {
- if (!fullPathStr.endsWith(Path.SEPARATOR)) {
- fullPathStr = fullPathStr + Path.SEPARATOR;
- }
- pendingWorkCount += processPath(startID, fullPathStr);
- }
- }
- }
-
- if (children.hasMore()) {
- lastReturnedName = children.getLastName();
- } else {
- return pendingWorkCount;
- }
- }
- }
-
- private void checkProcessingQueuesFree() {
- int remainingCapacity = remainingCapacity();
- // wait for queue to be free
- while (remainingCapacity <= 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for storageMovementNeeded queue to be free!");
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- remainingCapacity = remainingCapacity();
- }
- }
-
- /**
- * Returns queue remaining capacity.
- */
- public int remainingCapacity() {
- int size = service.processingQueueSize();
- int remainingSize = 0;
- if (size < maxQueueLimitToScan) {
- remainingSize = maxQueueLimitToScan - size;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
- + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
- }
- return remainingSize;
- }
-
- @Override
- public void scanAndCollectFileIds(Long inodeId) throws IOException {
- if (dfs == null) {
- dfs = getFS(service.getConf());
- }
- long pendingSatisfyItemsCount = processPath(inodeId,
- cxt.getFilePath(inodeId));
- // Check whether the given path contains any item to be tracked
- // or the no to be satisfied paths. In case of empty list, add the given
- // inodeId to the 'pendingWorkForDirectory' with empty list so that later
- // SPSPathIdProcessor#run function will remove the SPS hint considering that
- // this path is already satisfied the storage policy.
- if (pendingSatisfyItemsCount <= 0) {
- LOG.debug("There is no pending items to satisfy the given path "
- + "inodeId:{}", inodeId);
- service.addAllFileIdsToProcess(inodeId, new ArrayList<>(), true);
- } else {
- service.markScanCompletedForPath(inodeId);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
new file mode 100644
index 0000000..9435475
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
@@ -0,0 +1,172 @@
+package org.apache.hadoop.hdfs.server.sps;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
+import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to scan the paths recursively. If file is directory, then it
+ * will scan for files recursively. If the file is non directory, then it will
+ * just submit the same file to process. This will use file string path
+ * representation.
+ */
+@InterfaceAudience.Private
+public class ExternalSPSFilePathCollector implements FileCollector <String>{
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ExternalSPSFilePathCollector.class);
+ private DistributedFileSystem dfs;
+ private SPSService<String> service;
+ private int maxQueueLimitToScan;
+
+ public ExternalSPSFilePathCollector(SPSService<String> service) {
+ this.service = service;
+ this.maxQueueLimitToScan = service.getConf().getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+ try {
+ // TODO: probably we could get this dfs from external context? but this is
+ // too specific to external.
+ dfs = getFS(service.getConf());
+ } catch (IOException e) {
+ LOG.error("Unable to get the filesystem. Make sure Namenode running and "
+ + "configured namenode address is correct.", e);
+ }
+ }
+
+ private DistributedFileSystem getFS(Configuration conf) throws IOException {
+ return (DistributedFileSystem) FileSystem
+ .get(FileSystem.getDefaultUri(conf), conf);
+ }
+
+ /**
+ * Recursively scan the given path and add the file info to SPS service for
+ * processing.
+ */
+ private long processPath(String startID, String childPath) {
+ long pendingWorkCount = 0; // to be satisfied file counter
+ for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+ final DirectoryListing children;
+ try {
+ children = dfs.getClient().listPaths(childPath, lastReturnedName,
+ false);
+ } catch (IOException e) {
+ LOG.warn("Failed to list directory " + childPath
+ + ". Ignore the directory and continue.", e);
+ return pendingWorkCount;
+ }
+ if (children == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The scanning start dir/sub dir " + childPath
+ + " does not have childrens.");
+ }
+ return pendingWorkCount;
+ }
+
+ for (HdfsFileStatus child : children.getPartialListing()) {
+ String childFullPath = child.getFullName(childPath);
+ if (child.isFile()) {
+ service.addFileToProcess(
+ new ItemInfo<String>(startID, childFullPath), false);
+ checkProcessingQueuesFree();
+ pendingWorkCount++; // increment to be satisfied file count
+ } else {
+ if (child.isDirectory()) {
+ if (!childFullPath.endsWith(Path.SEPARATOR)) {
+ childFullPath = childFullPath + Path.SEPARATOR;
+ }
+ pendingWorkCount += processPath(startID, childFullPath);
+ }
+ }
+ }
+
+ if (children.hasMore()) {
+ lastReturnedName = children.getLastName();
+ } else {
+ return pendingWorkCount;
+ }
+ }
+ }
+
+ private void checkProcessingQueuesFree() {
+ int remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ /**
+ * Returns queue remaining capacity.
+ */
+ public int remainingCapacity() {
+ int size = service.processingQueueSize();
+ int remainingSize = 0;
+ if (size < maxQueueLimitToScan) {
+ remainingSize = maxQueueLimitToScan - size;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
+ }
+ return remainingSize;
+ }
+
+ @Override
+ public void scanAndCollectFiles(String path) throws IOException {
+ if (dfs == null) {
+ dfs = getFS(service.getConf());
+ }
+ long pendingSatisfyItemsCount = processPath(path, path);
+ // Check whether the given path contains any item to be tracked
+ // or the no to be satisfied paths. In case of empty list, add the given
+ // inodeId to the 'pendingWorkForDirectory' with empty list so that later
+ // SPSPathIdProcessor#run function will remove the SPS hint considering that
+ // this path is already satisfied the storage policy.
+ if (pendingSatisfyItemsCount <= 0) {
+ LOG.debug("There is no pending items to satisfy the given path "
+ + "inodeId:{}", path);
+ service.addAllFilesToProcess(path, new ArrayList<>(), true);
+ } else {
+ service.markScanCompletedForPath(path);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 33448db..6fc35ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -68,7 +68,8 @@ public final class ExternalStoragePolicySatisfier {
HdfsConfiguration spsConf = new HdfsConfiguration();
// login with SPS keytab
secureLogin(spsConf);
- StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
+ StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>(
+ spsConf);
nnc = getNameNodeConnector(spsConf);
boolean spsRunning;
@@ -86,8 +87,8 @@ public final class ExternalStoragePolicySatisfier {
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
externalHandler.init();
- sps.init(context, new ExternalSPSFileIDCollector(context, sps),
- externalHandler, blkMoveListener);
+ sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
+ blkMoveListener);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
if (sps != null) {
sps.join();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index b0e900d..b137f2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -214,11 +214,11 @@ message GetFilePathResponseProto {
required string srcPath = 1;
}
-message GetNextSPSPathIdRequestProto {
+message GetNextSPSPathRequestProto {
}
-message GetNextSPSPathIdResponseProto {
- optional uint64 fileId = 1;
+message GetNextSPSPathResponseProto {
+ optional string spsPath = 1;
}
message CheckDNSpaceRequestProto {
@@ -322,26 +322,15 @@ service NamenodeProtocolService {
returns (IsRollingUpgradeResponseProto);
/**
- * Return the corresponding file path for give file id
+ * Return the sps path from namenode
*/
- rpc getFilePath(GetFilePathRequestProto)
- returns (GetFilePathResponseProto);
+ rpc getNextSPSPath(GetNextSPSPathRequestProto)
+ returns (GetNextSPSPathResponseProto);
/**
- * Return the sps path id from namenode
- */
- rpc getNextSPSPathId(GetNextSPSPathIdRequestProto)
- returns (GetNextSPSPathIdResponseProto);
-
- /**
- * Return the sps path id from namenode
+ * Verifies whether the given Datanode has the enough estimated size with
+ * given storage type for scheduling the block movement.
*/
rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto)
returns (CheckDNSpaceResponseProto);
-
- /**
- * check whether given file id has low redundancy blocks.
- */
- rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto)
- returns (HasLowRedundancyBlocksResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 4097339..29af885 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -40,22 +40,21 @@ import org.mockito.Mockito;
*/
public class TestBlockStorageMovementAttemptedItems {
- private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
- private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+ private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems;
+ private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles;
private final int selfRetryTimeout = 500;
@Before
public void setup() throws Exception {
Configuration config = new HdfsConfiguration();
- Context ctxt = Mockito.mock(Context.class);
- SPSService sps = Mockito.mock(StoragePolicySatisfier.class);
- Mockito.when(sps.getConf()).thenReturn(config);
+ Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
+ SPSService<Long> sps = new StoragePolicySatisfier<Long>(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
unsatisfiedStorageMovementFiles =
- new BlockStorageMovementNeeded(ctxt, null);
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
+ new BlockStorageMovementNeeded<Long>(ctxt, null);
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps,
unsatisfiedStorageMovementFiles, null);
}
@@ -72,9 +71,9 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false;
while (monotonicNow() < (stopTime)) {
- ItemInfo ele = null;
+ ItemInfo<Long> ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
- if (item == ele.getFileId()) {
+ if (item == ele.getFile()) {
isItemFound = true;
break;
}
@@ -97,7 +96,7 @@ public class TestBlockStorageMovementAttemptedItems {
Long item = new Long(1234);
List<Block> blocks = new ArrayList<Block>();
blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+ bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
Block[] blockArray = new Block[blocks.size()];
blocks.toArray(blockArray);
bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
@@ -114,7 +113,7 @@ public class TestBlockStorageMovementAttemptedItems {
Long item = new Long(1234);
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+ bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
assertEquals("Shouldn't receive result", 0,
bsmAttemptedItems.getMovementFinishedBlocksCount());
assertEquals("Item doesn't exist in the attempted list", 1,
@@ -135,7 +134,7 @@ public class TestBlockStorageMovementAttemptedItems {
blocks.add(new Block(5678L));
Long trackID = 0L;
bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
@@ -160,7 +159,7 @@ public class TestBlockStorageMovementAttemptedItems {
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
@@ -188,7 +187,7 @@ public class TestBlockStorageMovementAttemptedItems {
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 6f7fe89..2a3d0c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -105,7 +105,7 @@ public class TestStoragePolicySatisfier {
public static final int NUM_OF_DATANODES = 3;
public static final int STORAGES_PER_DATANODE = 2;
public static final long CAPACITY = 2 * 256 * 1024 * 1024;
- public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying";
+ public static final String FILE = "/testMoveToSatisfyStoragePolicy";
public static final int DEFAULT_BLOCK_SIZE = 1024;
/**
@@ -1269,8 +1269,9 @@ public class TestStoragePolicySatisfier {
//Queue limit can control the traverse logic to wait for some free
//entry in queue. After 10 files, traverse control will be on U.
- StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
- Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+ StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config);
+ Context<Long> ctxt = new IntraSPSNameNodeContext(
+ hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override
public boolean isInSafeMode() {
@@ -1283,7 +1284,7 @@ public class TestStoragePolicySatisfier {
}
};
- FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
+ FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
@@ -1300,9 +1301,9 @@ public class TestStoragePolicySatisfier {
dfs.delete(new Path("/root"), true);
}
- public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
- Context ctxt) {
- FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector(
+ public FileCollector<Long> createFileIdCollector(
+ StoragePolicySatisfier<Long> sps, Context<Long> ctxt) {
+ FileCollector<Long> fileIDCollector = new IntraSPSNameNodeFileIdCollector(
hdfsCluster.getNamesystem().getFSDirectory(), sps);
return fileIDCollector;
}
@@ -1337,8 +1338,9 @@ public class TestStoragePolicySatisfier {
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
- StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
- Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+ StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config);
+ Context<Long> ctxt = new IntraSPSNameNodeContext(
+ hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override
public boolean isInSafeMode() {
@@ -1350,7 +1352,7 @@ public class TestStoragePolicySatisfier {
return true;
}
};
- FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
+ FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
@@ -1368,16 +1370,16 @@ public class TestStoragePolicySatisfier {
}
private void assertTraversal(List<String> expectedTraverseOrder,
- FSDirectory fsDir, StoragePolicySatisfier sps)
+ FSDirectory fsDir, StoragePolicySatisfier<Long> sps)
throws InterruptedException {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
- ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+ ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) {
continue;
}
- long trackId = itemInfo.getFileId();
+ Long trackId = itemInfo.getFile();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1388,11 +1390,11 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
- ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+ ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) {
continue;
}
- long trackId = itemInfo.getFileId();
+ Long trackId = itemInfo.getFile();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1696,39 +1698,41 @@ public class TestStoragePolicySatisfier {
return file1;
}
- private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
+ public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
- .getSPSManager().getInternalSPSService();
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager.getSPSManager()
+ .getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
- ((BlockStorageMovementAttemptedItems) (sps
+ ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())).getAttemptedItemsCount());
- return ((BlockStorageMovementAttemptedItems) (sps
+ return ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
}
- private void waitForBlocksMovementAttemptReport(
+ public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
- ((BlockStorageMovementAttemptedItems) (sps
+ ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
- return ((BlockStorageMovementAttemptedItems) (sps
+ return ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor()))
.getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
index ef12300..a39fb92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
@@ -495,16 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedBlkMovAttemptedCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
- ((BlockStorageMovementAttemptedItems) sps
+ ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()).getAttemptedItemsCount());
- return ((BlockStorageMovementAttemptedItems) sps
+ return ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor())
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
@@ -567,7 +568,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
+ final StoragePolicySatisfier<Long> sps =
+ (StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
Assert.assertNotNull("Failed to get SPS object reference!", sps);
@@ -575,9 +577,10 @@ public class TestStoragePolicySatisfierWithStripedFile {
@Override
public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
- expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps
+ expectedMoveFinishedBlks,
+ ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
- return ((BlockStorageMovementAttemptedItems) sps
+ return ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor())
.getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2b22201/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 0546f39..28e172a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -43,23 +43,23 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
-import org.apache.hadoop.hdfs.server.namenode.sps.Context;
-import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
import org.apache.hadoop.http.HttpConfig;
@@ -74,6 +74,8 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import com.google.common.base.Supplier;
+
/**
* Tests the external sps service plugins.
*/
@@ -88,6 +90,8 @@ public class TestExternalStoragePolicySatisfier
private String principal;
private MiniKdc kdc;
private File baseDir;
+ private StoragePolicySatisfier<String> externalSps;
+ private ExternalSPSContext externalCtxt;
@After
public void destroy() throws Exception {
@@ -98,6 +102,14 @@ public class TestExternalStoragePolicySatisfier
}
@Override
+ public void shutdownCluster() {
+ if (externalSps != null) {
+ externalSps.stopGracefully();
+ }
+ super.shutdownCluster();
+ }
+
+ @Override
public void setUp() {
super.setUp();
@@ -131,60 +143,44 @@ public class TestExternalStoragePolicySatisfier
nnc = getNameNodeConnector(getConf());
- BlockManager blkMgr = cluster.getNameNode().getNamesystem()
- .getBlockManager();
- SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
- spsService.stopGracefully();
-
- ExternalSPSContext context = new ExternalSPSContext(spsService,
+ externalSps = new StoragePolicySatisfier<String>(getConf());
+ externalCtxt = new ExternalSPSContext(externalSps,
getNameNodeConnector(conf));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, nnc,
- blkMgr.getSPSManager().getInternalSPSService());
+ externalSps);
externalHandler.init();
- spsService.init(context,
- new ExternalSPSFileIDCollector(context,
- blkMgr.getSPSManager().getInternalSPSService()),
- externalHandler, blkMoveListener);
- spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
+ externalSps.init(externalCtxt,
+ new ExternalSPSFilePathCollector(externalSps), externalHandler,
+ blkMoveListener);
+ externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
return cluster;
}
public void restartNamenode() throws IOException{
- BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
- .getBlockManager();
- SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
- spsService.stopGracefully();
+ if (externalSps != null) {
+ externalSps.stopGracefully();
+ }
getCluster().restartNameNodes();
getCluster().waitActive();
- blkMgr = getCluster().getNameNode().getNamesystem()
- .getBlockManager();
- spsService = blkMgr.getSPSManager().getInternalSPSService();
- spsService.stopGracefully();
+ externalSps = new StoragePolicySatisfier<>(getConf());
- ExternalSPSContext context = new ExternalSPSContext(spsService,
+ externalCtxt = new ExternalSPSContext(externalSps,
getNameNodeConnector(getConf()));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
- blkMgr.getSPSManager().getInternalSPSService());
+ externalSps);
externalHandler.init();
- spsService.init(context,
- new ExternalSPSFileIDCollector(context,
- blkMgr.getSPSManager().getInternalSPSService()),
- externalHandler, blkMoveListener);
- spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
- }
-
- @Override
- public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
- Context ctxt) {
- return new ExternalSPSFileIDCollector(ctxt, sps);
+ externalSps.init(externalCtxt,
+ new ExternalSPSFilePathCollector(externalSps), externalHandler,
+ blkMoveListener);
+ externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
private class ExternalBlockMovementListener implements BlockMovementListener {
@@ -204,7 +200,7 @@ public class TestExternalStoragePolicySatisfier
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
- final Path externalSPSPathId = new Path("/system/tmp.id");
+ final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
NameNodeConnector.checkOtherInstanceRunning(false);
while (true) {
try {
@@ -222,6 +218,40 @@ public class TestExternalStoragePolicySatisfier
}
}
+ public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
+ int timeout) throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
+ expectedBlkMovAttemptedCount,
+ ((BlockStorageMovementAttemptedItems<String>) (externalSps
+ .getAttemptedItemsMonitor())).getAttemptedItemsCount());
+ return ((BlockStorageMovementAttemptedItems<String>) (externalSps
+ .getAttemptedItemsMonitor()))
+ .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+ }
+ }, 100, timeout);
+ }
+
+ public void waitForBlocksMovementAttemptReport(
+ long expectedMovementFinishedBlocksCount, int timeout)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+ expectedMovementFinishedBlocksCount,
+ ((BlockStorageMovementAttemptedItems<String>) (externalSps
+ .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
+ return ((BlockStorageMovementAttemptedItems<String>) (externalSps
+ .getAttemptedItemsMonitor()))
+ .getMovementFinishedBlocksCount()
+ >= expectedMovementFinishedBlocksCount;
+ }
+ }, 100, timeout);
+ }
+
private void initSecureConf(Configuration conf) throws Exception {
String username = "externalSPS";
baseDir = GenericTestUtils
@@ -321,10 +351,6 @@ public class TestExternalStoragePolicySatisfier
List<String> files = new ArrayList<>();
files.add(FILE);
DistributedFileSystem fs = getFS();
- BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
- .getBlockManager();
- SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
- spsService.stopGracefully(); // stops SPS
// Creates 4 more files. Send all of them for satisfying the storage
// policy together.
@@ -367,6 +393,28 @@ public class TestExternalStoragePolicySatisfier
}
/**
+ * Tests to verify that SPS should be able to start when the Mover ID file
+ * is not being hold by a Mover. This can be the case when Mover exits
+ * ungracefully without deleting the ID file from HDFS.
+ */
+ @Test(timeout = 300000)
+ public void testWhenMoverExitsWithoutDeleteMoverIDFile()
+ throws IOException {
+ try {
+ createCluster();
+ // Simulate the case by creating MOVER_ID file
+ DFSTestUtil.createFile(getCluster().getFileSystem(),
+ HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
+ restartNamenode();
+ boolean running = externalCtxt.isRunning();
+ Assert.assertTrue("SPS should be running as "
+ + "no Mover really running", running);
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+ /**
* This test need not run as external scan is not a batch based scanning right
* now.
*/
@@ -389,4 +437,20 @@ public class TestExternalStoragePolicySatisfier
@Ignore("Status is not supported for external SPS. So, ignoring it.")
public void testMaxRetryForFailedBlock() throws Exception {
}
+
+ /**
+ * This test is specific to internal SPS. So, ignoring it.
+ */
+ @Ignore("This test is specific to internal SPS. So, ignoring it.")
+ @Override
+ public void testTraverseWhenParentDeleted() throws Exception {
+ }
+
+ /**
+ * This test is specific to internal SPS. So, ignoring it.
+ */
+ @Ignore("This test is specific to internal SPS. So, ignoring it.")
+ @Override
+ public void testTraverseWhenRootParentDeleted() throws Exception {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org