You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/09/14 00:49:38 UTC
svn commit: r1170378 [11/12] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-cli...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Tue Sep 13 22:49:27 2011
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -65,8 +66,8 @@ public class TestLeafQueue {
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
- Queue root;
- Map<String, Queue> queues = new HashMap<String, Queue>();
+ CSQueue root;
+ Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
@@ -145,10 +146,11 @@ public class TestLeafQueue {
any(Resource.class));
// 2. Stub out LeafQueue.parent.completedContainer
- Queue parent = queue.getParent();
+ CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
- any(RMContainer.class), any(RMContainerEventType.class));
+ any(RMContainer.class), any(ContainerStatus.class),
+ any(RMContainerEventType.class));
return queue;
}
@@ -238,7 +240,7 @@ public class TestLeafQueue {
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
- RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -247,7 +249,7 @@ public class TestLeafQueue {
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
- RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -392,7 +394,7 @@ public class TestLeafQueue {
// 8. Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
- RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -403,7 +405,7 @@ public class TestLeafQueue {
// 9. Release each container from app_2
for (RMContainer rmContainer : app_2.getLiveContainers()) {
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
- RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -414,7 +416,7 @@ public class TestLeafQueue {
// 10. Release each container from app_3
for (RMContainer rmContainer : app_3.getLiveContainers()) {
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
- RMContainerEventType.KILL);
+ null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -489,7 +491,7 @@ public class TestLeafQueue {
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -499,7 +501,7 @@ public class TestLeafQueue {
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -582,7 +584,7 @@ public class TestLeafQueue {
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -613,7 +615,7 @@ public class TestLeafQueue {
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
- app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
+ app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Tue Sep 13 22:49:27 2011
@@ -81,7 +81,7 @@ public class TestParentQueue {
LOG.info("Setup top-level queues a and b");
}
- private void stubQueueAllocation(final Queue queue,
+ private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node,
final int allocation) {
@@ -121,7 +121,7 @@ public class TestParentQueue {
when(queue).assignContainers(eq(clusterResource), eq(node));
}
- private float computeQueueUtilization(Queue queue,
+ private float computeQueueUtilization(CSQueue queue,
int expectedMemory, Resource clusterResource) {
return (expectedMemory /
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
@@ -132,8 +132,8 @@ public class TestParentQueue {
// Setup queue configs
setupSingleLevelQueues(csConf);
- Map<String, Queue> queues = new HashMap<String, Queue>();
- Queue root =
+ Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+ CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacityScheduler.ROOT, queues, queues,
CapacityScheduler.queueComparator,
@@ -270,8 +270,8 @@ public class TestParentQueue {
// Setup queue configs
setupMultiLevelQueues(csConf);
- Map<String, Queue> queues = new HashMap<String, Queue>();
- Queue root =
+ Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+ CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacityScheduler.ROOT, queues, queues,
CapacityScheduler.queueComparator,
@@ -294,17 +294,17 @@ public class TestParentQueue {
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Start testing
- Queue a = queues.get(A);
- Queue b = queues.get(B);
- Queue c = queues.get(C);
- Queue d = queues.get(D);
-
- Queue a1 = queues.get(A1);
- Queue a2 = queues.get(A2);
-
- Queue b1 = queues.get(B1);
- Queue b2 = queues.get(B2);
- Queue b3 = queues.get(B3);
+ CSQueue a = queues.get(A);
+ CSQueue b = queues.get(B);
+ CSQueue c = queues.get(C);
+ CSQueue d = queues.get(D);
+
+ CSQueue a1 = queues.get(A1);
+ CSQueue a2 = queues.get(A2);
+
+ CSQueue b1 = queues.get(B1);
+ CSQueue b2 = queues.get(B2);
+ CSQueue b3 = queues.get(B3);
final float delta = 0.0001f;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Tue Sep 13 22:49:27 2011
@@ -85,7 +85,7 @@ public class TestUtils {
*/
static class SpyHook extends CapacityScheduler.QueueHook {
@Override
- public Queue hook(Queue queue) {
+ public CSQueue hook(CSQueue queue) {
return spy(queue);
}
}
@@ -154,8 +154,8 @@ public class TestUtils {
public static ContainerId getMockContainerId(SchedulerApp application) {
ContainerId containerId = mock(ContainerId.class);
- doReturn(application.getApplicationAttemptId()).when(containerId).getAppAttemptId();
- doReturn(application.getApplicationId()).when(containerId).getAppId();
+ doReturn(application.getApplicationAttemptId()).
+ when(containerId).getApplicationAttemptId();
doReturn(application.getNewContainerId()).when(containerId).getId();
return containerId;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java Tue Sep 13 22:49:27 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Test;
@@ -168,9 +169,38 @@ public class TestRMWebApp {
conf.setCapacity(C13, 40);
}
+ public static ResourceManager mockFifoRm(int apps, int racks, int nodes,
+ int mbsPerNode)
+ throws Exception {
+ ResourceManager rm = mock(ResourceManager.class);
+ RMContext rmContext = mockRMContext(apps, racks, nodes,
+ mbsPerNode);
+ ResourceScheduler rs = mockFifoScheduler();
+ when(rm.getResourceScheduler()).thenReturn(rs);
+ when(rm.getRMContext()).thenReturn(rmContext);
+ return rm;
+ }
+
+ public static FifoScheduler mockFifoScheduler() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupFifoQueueConfiguration(conf);
+
+ FifoScheduler fs = new FifoScheduler();
+ fs.reinitialize(conf, null, null);
+ return fs;
+ }
+
+ static void setupFifoQueueConfiguration(CapacitySchedulerConfiguration conf) {
+ // Define default queue
+ conf.setQueues("default", new String[] {"default"});
+ conf.setCapacity("default", 100);
+ }
+
public static void main(String[] args) throws Exception {
// For manual testing
WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode().
start(new RMWebApp(mockRm(101, 8, 8, 8*GiB))).joinThread();
+ WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode().
+ start(new RMWebApp(mockFifoRm(10, 1, 4, 8*GiB))).joinThread();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Tue Sep 13 22:49:27 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -154,7 +155,7 @@ public class MiniYARNCluster extends Com
new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir");
localDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
- getConfig().set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+ getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
File logDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-logDir");
@@ -164,10 +165,10 @@ public class MiniYARNCluster extends Com
logDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created logDir in " + logDir.getAbsolutePath());
- getConfig().set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
- getConfig().set(NMConfig.REMOTE_USER_LOG_DIR,
+ getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
+ getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogDir.getAbsolutePath());
- getConfig().setInt(NMConfig.NM_VMEM_GB, 4); // By default AM + 2 containers
+ getConfig().setInt(YarnConfiguration.NM_VMEM_GB, 4); // By default AM + 2 containers
nodeManager = new NodeManager() {
@Override
@@ -177,9 +178,10 @@ public class MiniYARNCluster extends Com
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher,
- healthChecker, metrics) {
+ healthChecker, metrics, containerTokenSecretManager) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java Tue Sep 13 22:49:27 2011
@@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
+import org.junit.AfterClass;
import org.junit.Test;
public class TestContainerTokenSecretManager {
@@ -94,6 +95,7 @@ public class TestContainerTokenSecretMan
private static final File localDir = new File("target",
TestContainerTokenSecretManager.class.getName() + "-localDir")
.getAbsoluteFile();
+ private static MiniYARNCluster yarnCluster;
@BeforeClass
public static void setup() throws AccessControlException,
@@ -103,6 +105,12 @@ public class TestContainerTokenSecretMan
localDir.mkdir();
}
+ @AfterClass
+ public static void teardown() {
+ yarnCluster.stop();
+ }
+
+
@Test
public void test() throws IOException, InterruptedException {
@@ -114,9 +122,9 @@ public class TestContainerTokenSecretMan
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// Set AM expiry interval to be very long.
- conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 100000L);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
UserGroupInformation.setConfiguration(conf);
- MiniYARNCluster yarnCluster =
+ yarnCluster =
new MiniYARNCluster(TestContainerTokenSecretManager.class.getName());
yarnCluster.init(conf);
yarnCluster.start();
@@ -183,8 +191,8 @@ public class TestContainerTokenSecretMan
// Ask for a container from the RM
String schedulerAddressString =
- conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+ conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
final InetSocketAddress schedulerAddr =
NetUtils.createSocketAddr(schedulerAddressString);
ApplicationTokenIdentifier appTokenIdentifier =
@@ -216,8 +224,6 @@ public class TestContainerTokenSecretMan
RegisterApplicationMasterRequest request =
recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class);
- ApplicationMaster applicationMaster = recordFactory
- .newRecordInstance(ApplicationMaster.class);
request.setApplicationAttemptId(resourceManager.getRMContext()
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
scheduler.registerApplicationMaster(request);
@@ -241,7 +247,7 @@ public class TestContainerTokenSecretMan
allocateRequest.addAllAsks(ask);
allocateRequest.addAllReleases(release);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
- .getAMResponse().getNewContainerList();
+ .getAMResponse().getAllocatedContainers();
waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0)
@@ -251,7 +257,7 @@ public class TestContainerTokenSecretMan
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers =
scheduler.allocate(allocateRequest).getAMResponse()
- .getNewContainerList();
+ .getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
@@ -285,12 +291,13 @@ public class TestContainerTokenSecretMan
.newRecordInstance(GetContainerStatusRequest.class);
ContainerId containerID =
recordFactory.newRecordInstance(ContainerId.class);
- ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationAttemptId appAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appID);
appAttemptId.setAttemptId(1);
- containerID.setAppId(appID);
+ appAttemptId.setApplicationId(appID);
+ containerID.setApplicationAttemptId(appAttemptId);
containerID.setId(1);
- containerID.setAppAttemptId(appAttemptId);
request.setContainerId(containerID);
client.getContainerStatus(request);
} catch (YarnRemoteException e) {
@@ -339,9 +346,9 @@ public class TestContainerTokenSecretMan
ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appID);
appAttemptId.setAttemptId(1);
- containerID.setAppId(appID);
+ appAttemptId.setApplicationId(appID);
+ containerID.setApplicationAttemptId(appAttemptId);
containerID.setId(1);
- containerID.setAppAttemptId(appAttemptId);
request.setContainerId(containerID);
try {
client.getContainerStatus(request);
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/c++:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/c++:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/c++:713112
/hadoop/core/trunk/src/c++:776175-784663
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib:1152502-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib:1152502-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
/hadoop/core/trunk/src/contrib:784664-785643
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/block_forensics/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/block_forensics:1152502-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/block_forensics:1152502-1170371
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/block_forensics:713112
/hadoop/core/branches/branch-0.19/mapred/src/contrib/block_forensics:713112
/hadoop/core/trunk/src/contrib/block_forensics:784664-785643
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build-contrib.xml:1161333-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build-contrib.xml:1161333-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
/hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build.xml:1161333-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build.xml:1161333-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
/hadoop/core/trunk/src/contrib/build.xml:776175-786373
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/capacity-scheduler:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/capacity-scheduler:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
/hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/data_join:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/data_join:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
/hadoop/core/trunk/src/contrib/data_join:776175-786373
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/dynamic-scheduler:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/dynamic-scheduler:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/dynamic-scheduler:713112
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
/hadoop/core/trunk/src/contrib/dynamic-scheduler:784664-786373
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/eclipse-plugin:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/eclipse-plugin:1159757-1170371
/hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
/hadoop/core/branches/branch-0.19/mapred/src/contrib/eclipse-plugin:713112
/hadoop/core/trunk/src/contrib/eclipse-plugin:776175-785643
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopServer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopServer.java Tue Sep 13 22:49:27 2011
@@ -36,6 +36,7 @@ import javax.xml.parsers.ParserConfigura
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.eclipse.Activator;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
@@ -420,8 +421,14 @@ public class HadoopServer {
*/
public void storeSettingsToFile(File file) throws IOException {
FileOutputStream fos = new FileOutputStream(file);
- this.conf.writeXml(fos);
- fos.close();
+ try {
+ this.conf.writeXml(fos);
+ fos.close();
+ fos = null;
+ } finally {
+ IOUtils.closeStream(fos);
+ }
+
}
/* @inheritDoc */
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/servers/RunOnHadoopWizard.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/servers/RunOnHadoopWizard.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/servers/RunOnHadoopWizard.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/servers/RunOnHadoopWizard.java Tue Sep 13 22:49:27 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.eclipse.Activat
import org.apache.hadoop.eclipse.ErrorMessageDialog;
import org.apache.hadoop.eclipse.server.HadoopServer;
import org.apache.hadoop.eclipse.server.JarModule;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.eclipse.core.resources.IFile;
import org.eclipse.core.runtime.CoreException;
@@ -164,8 +165,13 @@ public class RunOnHadoopWizard extends W
// confDir);
File confFile = new File(confDir, "core-site.xml");
FileOutputStream fos = new FileOutputStream(confFile);
- conf.writeXml(fos);
- fos.close();
+ try {
+ conf.writeXml(fos);
+ fos.close();
+ fos = null;
+ } finally {
+ IOUtils.closeStream(fos);
+ }
} catch (IOException ioe) {
ioe.printStackTrace();
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/fairscheduler:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/fairscheduler:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java Tue Sep 13 22:49:27 2011
@@ -149,10 +149,15 @@ class ExecutionSummarizer implements Sta
throws IOException {
numJobsInInputTrace = factory.numJobsInTrace;
endTime = System.currentTimeMillis();
- Path inputTracePath = new Path(inputPath);
- FileSystem fs = inputTracePath.getFileSystem(conf);
- inputTraceLocation = fs.makeQualified(inputTracePath).toString();
- inputTraceSignature = getTraceSignature(inputTraceLocation);
+ if ("-".equals(inputPath)) {
+ inputTraceLocation = Summarizer.NA;
+ inputTraceSignature = Summarizer.NA;
+ } else {
+ Path inputTracePath = new Path(inputPath);
+ FileSystem fs = inputTracePath.getFileSystem(conf);
+ inputTraceLocation = fs.makeQualified(inputTracePath).toString();
+ inputTraceSignature = getTraceSignature(inputPath);
+ }
jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
resolver = userResolver.getClass().getName();
if (dataSize > 0) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Sep 13 22:49:27 2011
@@ -314,9 +314,13 @@ public class Gridmix extends Configured
}
});
- // print the run summary
- System.out.print("\n\n");
- System.out.println(summarizer.toString());
+ // print the gridmix summary if the run was successful
+ if (val == 0) {
+ // print the run summary
+ System.out.print("\n\n");
+ System.out.println(summarizer.toString());
+ }
+
return val;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java Tue Sep 13 22:49:27 2011
@@ -52,15 +52,23 @@ public class ResourceUsageMatcher {
@SuppressWarnings("unchecked")
public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
ResourceUsageMetrics metrics, Progressive progress) {
- Class[] plugins =
- conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS,
- ResourceUsageEmulatorPlugin.class);
+ Class[] plugins = conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS);
if (plugins == null) {
System.out.println("No resource usage emulator plugins configured.");
} else {
- for (Class<? extends ResourceUsageEmulatorPlugin> plugin : plugins) {
- if (plugin != null) {
- emulationPlugins.add(ReflectionUtils.newInstance(plugin, conf));
+ for (Class clazz : plugins) {
+ if (clazz != null) {
+ if (ResourceUsageEmulatorPlugin.class.isAssignableFrom(clazz)) {
+ ResourceUsageEmulatorPlugin plugin =
+ (ResourceUsageEmulatorPlugin) ReflectionUtils.newInstance(clazz,
+ conf);
+ emulationPlugins.add(plugin);
+ } else {
+ throw new RuntimeException("Misconfigured resource usage plugins. "
+ + "Class " + clazz.getClass().getName() + " is not a resource "
+ + "usage plugin as it does not extend "
+ + ResourceUsageEmulatorPlugin.class.getName());
+ }
}
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java Tue Sep 13 22:49:27 2011
@@ -159,7 +159,7 @@ public class TestGridmixSummary {
@Override
protected Thread createReaderThread() {
- return null;
+ return new Thread();
}
}
@@ -243,7 +243,7 @@ public class TestGridmixSummary {
tid, es.getInputTraceSignature());
// test trace location
Path qPath = fs.makeQualified(testTraceFile);
- assertEquals("Mismatch in trace signature",
+ assertEquals("Mismatch in trace filename",
qPath.toString(), es.getInputTraceLocation());
// test expected data size
assertEquals("Mismatch in expected data size",
@@ -275,7 +275,7 @@ public class TestGridmixSummary {
es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats,
conf);
// test missing expected data size
- assertEquals("Mismatch in trace signature",
+ assertEquals("Mismatch in trace data size",
Summarizer.NA, es.getExpectedDataSize());
assertFalse("Mismatch in trace signature",
tid.equals(es.getInputTraceSignature()));
@@ -295,6 +295,12 @@ public class TestGridmixSummary {
assertEquals("Mismatch in trace signature",
tid, es.getInputTraceSignature());
+ // finalize trace identifier '-' input
+ es.finalize(factory, "-", 0L, resolver, dataStats, conf);
+ assertEquals("Mismatch in trace signature",
+ Summarizer.NA, es.getInputTraceSignature());
+ assertEquals("Mismatch in trace file location",
+ Summarizer.NA, es.getInputTraceLocation());
}
// test the ExecutionSummarizer
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
/hadoop/core/trunk/src/contrib/index:776175-786373
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java Tue Sep 13 22:49:27 2011
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import junit.framework.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,475 +41,479 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
-import org.apache.hadoop.hdfs.server.namenode.*;
+import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRaidUtil;
+import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.raid.RaidNode;
+import org.junit.Assert;
import org.junit.Test;
public class TestBlockPlacementPolicyRaid {
+ private Configuration conf = null;
+ private MiniDFSCluster cluster = null;
+ private FSNamesystem namesystem = null;
+ private BlockManager blockManager;
+ private NetworkTopology networktopology;
+ private BlockPlacementPolicyRaid policy = null;
+ private FileSystem fs = null;
+ String[] rack1 = {"/rack1"};
+ String[] rack2 = {"/rack2"};
+ String[] host1 = {"host1.rack1.com"};
+ String[] host2 = {"host2.rack2.com"};
+ String xorPrefix = null;
+ String raidTempPrefix = null;
+ String raidrsTempPrefix = null;
+ String raidrsHarTempPrefix = null;
+
+ final static Log LOG =
+ LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
+
+ protected void setupCluster() throws IOException {
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+ conf.set("dfs.replication.pending.timeout.sec", "2");
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
+ conf.set("dfs.block.replicator.classname",
+ BlockPlacementPolicyRaid.class.getName());
+ conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
+ conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+ // start the cluster with one datanode first
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
+ format(true).racks(rack1).hosts(host1).build();
+ cluster.waitActive();
+ namesystem = cluster.getNameNode().getNamesystem();
+ blockManager = namesystem.getBlockManager();
+ networktopology = blockManager.getDatanodeManager().getNetworkTopology();
+
+ Assert.assertTrue("BlockPlacementPolicy type is not correct.",
+ blockManager.getBlockPlacementPolicy() instanceof BlockPlacementPolicyRaid);
+ policy = (BlockPlacementPolicyRaid)blockManager.getBlockPlacementPolicy();
+ fs = cluster.getFileSystem();
+ xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+ raidTempPrefix = RaidNode.xorTempPrefix(conf);
+ raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+ raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+ }
+
+ /**
+ * Test that the parity files will be placed at the good locations when we
+ * create them.
+ */
+ @Test
+ public void testChooseTargetForRaidFile() throws IOException {
+ setupCluster();
+ try {
+ String src = "/dir/file";
+ String parity = raidrsTempPrefix + src;
+ DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
+ refreshPolicy();
+ setBlockPlacementPolicy(namesystem, policy);
+ // start 3 more datanodes
+ String[] racks = {"/rack2", "/rack2", "/rack2",
+ "/rack2", "/rack2", "/rack2"};
+ String[] hosts =
+ {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+ "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+ cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+ int numBlocks = 6;
+ DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
+ DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
+ FileStatus srcStat = fs.getFileStatus(new Path(src));
+ BlockLocation[] srcLoc =
+ fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+ FileStatus parityStat = fs.getFileStatus(new Path(parity));
+ BlockLocation[] parityLoc =
+ fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
+ int parityLen = RaidNode.rsParityLength(conf);
+ for (int i = 0; i < numBlocks / parityLen; i++) {
+ Set<String> locations = new HashSet<String>();
+ for (int j = 0; j < srcLoc.length; j++) {
+ String [] names = srcLoc[j].getNames();
+ for (int k = 0; k < names.length; k++) {
+ LOG.info("Source block location: " + names[k]);
+ locations.add(names[k]);
+ }
+ }
+ for (int j = 0 ; j < parityLen; j++) {
+ String[] names = parityLoc[j + i * parityLen].getNames();
+ for (int k = 0; k < names.length; k++) {
+ LOG.info("Parity block location: " + names[k]);
+ Assert.assertTrue(locations.add(names[k]));
+ }
+ }
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test that the har parity files will be placed at the good locations when we
+ * create them.
+ */
+ @Test
+ public void testChooseTargetForHarRaidFile() throws IOException {
+ setupCluster();
+ try {
+ String[] racks = {"/rack2", "/rack2", "/rack2",
+ "/rack2", "/rack2", "/rack2"};
+ String[] hosts =
+ {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+ "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+ cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+ String harParity = raidrsHarTempPrefix + "/dir/file";
+ int numBlocks = 11;
+ DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
+ FileStatus stat = fs.getFileStatus(new Path(harParity));
+ BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
+ int rsParityLength = RaidNode.rsParityLength(conf);
+ for (int i = 0; i < numBlocks - rsParityLength; i++) {
+ Set<String> locations = new HashSet<String>();
+ for (int j = 0; j < rsParityLength; j++) {
+ for (int k = 0; k < loc[i + j].getNames().length; k++) {
+ // verify that every adjacent 4 blocks are on differnt nodes
+ String name = loc[i + j].getNames()[k];
+ LOG.info("Har Raid block location: " + name);
+ Assert.assertTrue(locations.add(name));
+ }
+ }
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
+ * Verify that the results obtained from cache is the same as
+ * the results obtained directly
+ */
+ @Test
+ public void testCachedBlocks() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/dir/file2";
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ // test blocks cache
+ CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ try {
+ Thread.sleep(1200L);
+ } catch (InterruptedException e) {
+ }
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+ verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid.CachedFullPathNames
+ * Verify that the results obtained from cache is the same as
+ * the results obtained directly
+ */
@Test
- public void testFoo() {
+ public void testCachedPathNames() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/dir/file2";
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ // test full path cache
+ CachedFullPathNames cachedFullPathNames =
+ new CachedFullPathNames(namesystem);
+ final FSInodeInfo[] inodes = NameNodeRaidTestUtil.getFSInodeInfo(
+ namesystem, file1, file2);
+
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
+ try {
+ Thread.sleep(1200L);
+ } catch (InterruptedException e) {
+ }
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[1]);
+ verifyCachedFullPathNameResult(cachedFullPathNames, inodes[0]);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+ /**
+ * Test the result of getCompanionBlocks() on the unraided files
+ */
+ @Test
+ public void testGetCompanionBLocks() throws IOException {
+ setupCluster();
+ try {
+ String file1 = "/dir/file1";
+ String file2 = "/raid/dir/file2";
+ String file3 = "/raidrs/dir/file3";
+ // Set the policy to default policy to place the block in the default way
+ setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+ conf, namesystem, networktopology));
+ DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+ DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
+ Collection<LocatedBlock> companionBlocks;
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
+ Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
+ Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
+ Assert.assertEquals(1, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
+ Assert.assertEquals(1, companionBlocks.size());
+
+ int rsParityLength = RaidNode.rsParityLength(conf);
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
+ Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
+ Assert.assertEquals(rsParityLength, companionBlocks.size());
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
+ Assert.assertEquals(2, companionBlocks.size());
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ static void setBlockPlacementPolicy(
+ FSNamesystem namesystem, BlockPlacementPolicy policy) {
+ namesystem.writeLock();
+ try {
+ namesystem.getBlockManager().setBlockPlacementPolicy(policy);
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
+ /**
+ * Test BlockPlacementPolicyRaid actually deletes the correct replica.
+ * Start 2 datanodes and create 1 source file and its parity file.
+ * 1) Start host1, create the parity file with replication 1
+ * 2) Start host2, create the source file with replication 2
+ * 3) Set repliation of source file to 1
+ * Verify that the policy should delete the block with more companion blocks.
+ */
+ @Test
+ public void testDeleteReplica() throws IOException {
+ setupCluster();
+ try {
+ // Set the policy to default policy to place the block in the default way
+ setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+ conf, namesystem, networktopology));
+ DatanodeDescriptor datanode1 = blockManager.getDatanodeManager(
+ ).getDatanodeCyclicIteration("").iterator().next().getValue();
+ String source = "/dir/file";
+ String parity = xorPrefix + source;
+
+ final Path parityPath = new Path(parity);
+ DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, parityPath, (short)1);
+
+ // start one more datanode
+ cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
+ DatanodeDescriptor datanode2 = null;
+ for(Map.Entry<String, DatanodeDescriptor> e : blockManager.getDatanodeManager(
+ ).getDatanodeCyclicIteration("")) {
+ final DatanodeDescriptor d = e.getValue();
+ if (!d.getName().equals(datanode1.getName())) {
+ datanode2 = d;
+ }
+ }
+ Assert.assertTrue(datanode2 != null);
+ cluster.waitActive();
+ final Path sourcePath = new Path(source);
+ DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
+ DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
+
+ refreshPolicy();
+ Assert.assertEquals(parity,
+ policy.getParityFile(source));
+ Assert.assertEquals(source,
+ policy.getSourceFile(parity, xorPrefix));
+
+ List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
+ List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
+ Assert.assertEquals(5, sourceBlocks.size());
+ Assert.assertEquals(3, parityBlocks.size());
+
+ // verify the result of getCompanionBlocks()
+ Collection<LocatedBlock> companionBlocks;
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(0).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(1).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(2).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(3).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, sourceBlocks.get(4).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{4}, new int[]{2});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(0).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{0, 1}, new int[]{0});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(1).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{2, 3}, new int[]{1});
+
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(2).getBlock());
+ verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+ new int[]{4}, new int[]{2});
+
+ // Set the policy back to raid policy. We have to create a new object
+ // here to clear the block location cache
+ refreshPolicy();
+ setBlockPlacementPolicy(namesystem, policy);
+ // verify policy deletes the correct blocks. companion blocks should be
+ // evenly distributed.
+ fs.setReplication(sourcePath, (short)1);
+ DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
+ Map<String, Integer> counters = new HashMap<String, Integer>();
+ refreshPolicy();
+ for (int i = 0; i < parityBlocks.size(); i++) {
+ companionBlocks = getCompanionBlocks(
+ namesystem, policy, parityBlocks.get(i).getBlock());
+
+ counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+ companionBlocks, false);
+ Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
+ counters.get(datanode1.getName()) <= 2);
+ Assert.assertTrue(counters.get(datanode1.getName()) +
+ counters.get(datanode2.getName()) ==
+ companionBlocks.size());
+
+ counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+ companionBlocks, true);
+ Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
+ counters.get(datanode1.getParent().getName()) <= 2);
+ Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
+ counters.get(datanode2.getParent().getName()) ==
+ companionBlocks.size());
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ // create a new BlockPlacementPolicyRaid to clear the cache
+ private void refreshPolicy() {
+ policy = new BlockPlacementPolicyRaid();
+ policy.initialize(conf, namesystem, networktopology);
+ }
+
+ private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
+ List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
+ int[] sourceBlockIndexes, int[] parityBlockIndexes) {
+ Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
+ for (LocatedBlock b : companionBlocks) {
+ blockSet.add(b.getBlock());
+ }
+ Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
+ blockSet.size());
+ for (int index : sourceBlockIndexes) {
+ Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
+ }
+ for (int index : parityBlockIndexes) {
+ Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
+ }
+ }
+
+ private void verifyCachedFullPathNameResult(
+ CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
+ throws IOException {
+ String res1 = inode.getFullPathName();
+ String res2 = cachedFullPathNames.get(inode);
+ LOG.info("Actual path name: " + res1);
+ LOG.info("Cached path name: " + res2);
+ Assert.assertEquals(cachedFullPathNames.get(inode),
+ inode.getFullPathName());
+ }
+
+ private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
+ FSNamesystem namesystem, String file) throws IOException{
+ long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
+ List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
+ file, 0L, len, false, false).getLocatedBlocks();
+ List<LocatedBlock> res2 = cachedBlocks.get(file);
+ for (int i = 0; i < res1.size(); i++) {
+ LOG.info("Actual block: " + res1.get(i).getBlock());
+ LOG.info("Cached block: " + res2.get(i).getBlock());
+ Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
+ }
+ }
+
+ private Collection<LocatedBlock> getCompanionBlocks(
+ FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
+ ExtendedBlock block) throws IOException {
+ INodeFile inode = blockManager.blocksMap.getINode(block
+ .getLocalBlock());
+ FileType type = policy.getFileType(inode.getFullPathName());
+ return policy.getCompanionBlocks(inode.getFullPathName(), type,
+ block.getLocalBlock());
+ }
+
+ private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
+ throws IOException {
+ long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
+ return NameNodeRaidUtil.getBlockLocations(namesystem,
+ file, 0, len, false, false).getLocatedBlocks();
}
-// private Configuration conf = null;
-// private MiniDFSCluster cluster = null;
-// private FSNamesystem namesystem = null;
-// private BlockPlacementPolicyRaid policy = null;
-// private FileSystem fs = null;
-// String[] rack1 = {"/rack1"};
-// String[] rack2 = {"/rack2"};
-// String[] host1 = {"host1.rack1.com"};
-// String[] host2 = {"host2.rack2.com"};
-// String xorPrefix = null;
-// String raidTempPrefix = null;
-// String raidrsTempPrefix = null;
-// String raidrsHarTempPrefix = null;
-//
-// final static Log LOG =
-// LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
-//
-// protected void setupCluster() throws IOException {
-// conf = new Configuration();
-// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
-// conf.set("dfs.replication.pending.timeout.sec", "2");
-// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
-// conf.set("dfs.block.replicator.classname",
-// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
-// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
-// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
-// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
-// // start the cluster with one datanode first
-// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
-// format(true).racks(rack1).hosts(host1).build();
-// cluster.waitActive();
-// namesystem = cluster.getNameNode().getNamesystem();
-// Assert.assertTrue("BlockPlacementPolicy type is not correct.",
-// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
-// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
-// fs = cluster.getFileSystem();
-// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
-// raidTempPrefix = RaidNode.xorTempPrefix(conf);
-// raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
-// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
-// }
-//
-// /**
-// * Test that the parity files will be placed at the good locations when we
-// * create them.
-// */
-// @Test
-// public void testChooseTargetForRaidFile() throws IOException {
-// setupCluster();
-// try {
-// String src = "/dir/file";
-// String parity = raidrsTempPrefix + src;
-// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
-// DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
-// refreshPolicy();
-// setBlockPlacementPolicy(namesystem, policy);
-// // start 3 more datanodes
-// String[] racks = {"/rack2", "/rack2", "/rack2",
-// "/rack2", "/rack2", "/rack2"};
-// String[] hosts =
-// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
-// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
-// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
-// int numBlocks = 6;
-// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
-// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
-// FileStatus srcStat = fs.getFileStatus(new Path(src));
-// BlockLocation[] srcLoc =
-// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
-// FileStatus parityStat = fs.getFileStatus(new Path(parity));
-// BlockLocation[] parityLoc =
-// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
-// int parityLen = RaidNode.rsParityLength(conf);
-// for (int i = 0; i < numBlocks / parityLen; i++) {
-// Set<String> locations = new HashSet<String>();
-// for (int j = 0; j < srcLoc.length; j++) {
-// String [] names = srcLoc[j].getNames();
-// for (int k = 0; k < names.length; k++) {
-// LOG.info("Source block location: " + names[k]);
-// locations.add(names[k]);
-// }
-// }
-// for (int j = 0 ; j < parityLen; j++) {
-// String[] names = parityLoc[j + i * parityLen].getNames();
-// for (int k = 0; k < names.length; k++) {
-// LOG.info("Parity block location: " + names[k]);
-// Assert.assertTrue(locations.add(names[k]));
-// }
-// }
-// }
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-//
-// /**
-// * Test that the har parity files will be placed at the good locations when we
-// * create them.
-// */
-// @Test
-// public void testChooseTargetForHarRaidFile() throws IOException {
-// setupCluster();
-// try {
-// String[] racks = {"/rack2", "/rack2", "/rack2",
-// "/rack2", "/rack2", "/rack2"};
-// String[] hosts =
-// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
-// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
-// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
-// String harParity = raidrsHarTempPrefix + "/dir/file";
-// int numBlocks = 11;
-// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
-// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
-// FileStatus stat = fs.getFileStatus(new Path(harParity));
-// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
-// int rsParityLength = RaidNode.rsParityLength(conf);
-// for (int i = 0; i < numBlocks - rsParityLength; i++) {
-// Set<String> locations = new HashSet<String>();
-// for (int j = 0; j < rsParityLength; j++) {
-// for (int k = 0; k < loc[i + j].getNames().length; k++) {
-// // verify that every adjacent 4 blocks are on differnt nodes
-// String name = loc[i + j].getNames()[k];
-// LOG.info("Har Raid block location: " + name);
-// Assert.assertTrue(locations.add(name));
-// }
-// }
-// }
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-//
-// /**
-// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
-// * Verify that the results obtained from cache is the same as
-// * the results obtained directly
-// */
-// @Test
-// public void testCachedBlocks() throws IOException {
-// setupCluster();
-// try {
-// String file1 = "/dir/file1";
-// String file2 = "/dir/file2";
-// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
-// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
-// // test blocks cache
-// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
-// try {
-// Thread.sleep(1200L);
-// } catch (InterruptedException e) {
-// }
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
-// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-//
-// /**
-// * Test BlockPlacementPolicyRaid.CachedFullPathNames
-// * Verify that the results obtained from cache is the same as
-// * the results obtained directly
-// */
-// @Test
-// public void testCachedPathNames() throws IOException {
-// setupCluster();
-// try {
-// String file1 = "/dir/file1";
-// String file2 = "/dir/file2";
-// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
-// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
-// // test full path cache
-// CachedFullPathNames cachedFullPathNames =
-// new CachedFullPathNames(namesystem);
-// FSInodeInfo inode1 = null;
-// FSInodeInfo inode2 = null;
-// NameNodeRaidTestUtil.readLock(namesystem.dir);
-// try {
-// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true);
-// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true);
-// } finally {
-// NameNodeRaidTestUtil.readUnLock(namesystem.dir);
-// }
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
-// try {
-// Thread.sleep(1200L);
-// } catch (InterruptedException e) {
-// }
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
-// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-// /**
-// * Test the result of getCompanionBlocks() on the unraided files
-// */
-// @Test
-// public void testGetCompanionBLocks() throws IOException {
-// setupCluster();
-// try {
-// String file1 = "/dir/file1";
-// String file2 = "/raid/dir/file2";
-// String file3 = "/raidrs/dir/file3";
-// // Set the policy to default policy to place the block in the default way
-// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
-// conf, namesystem, namesystem.clusterMap));
-// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
-// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
-// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
-// Collection<LocatedBlock> companionBlocks;
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
-// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
-// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
-// Assert.assertEquals(1, companionBlocks.size());
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
-// Assert.assertEquals(1, companionBlocks.size());
-//
-// int rsParityLength = RaidNode.rsParityLength(conf);
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
-// Assert.assertEquals(rsParityLength, companionBlocks.size());
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
-// Assert.assertEquals(rsParityLength, companionBlocks.size());
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
-// Assert.assertEquals(2, companionBlocks.size());
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-//
-// static void setBlockPlacementPolicy(
-// FSNamesystem namesystem, BlockPlacementPolicy policy) {
-// namesystem.writeLock();
-// try {
-// namesystem.blockManager.replicator = policy;
-// } finally {
-// namesystem.writeUnlock();
-// }
-// }
-//
-// /**
-// * Test BlockPlacementPolicyRaid actually deletes the correct replica.
-// * Start 2 datanodes and create 1 source file and its parity file.
-// * 1) Start host1, create the parity file with replication 1
-// * 2) Start host2, create the source file with replication 2
-// * 3) Set repliation of source file to 1
-// * Verify that the policy should delete the block with more companion blocks.
-// */
-// @Test
-// public void testDeleteReplica() throws IOException {
-// setupCluster();
-// try {
-// // Set the policy to default policy to place the block in the default way
-// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
-// conf, namesystem, namesystem.clusterMap));
-// DatanodeDescriptor datanode1 =
-// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next();
-// String source = "/dir/file";
-// String parity = xorPrefix + source;
-//
-// final Path parityPath = new Path(parity);
-// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
-// DFSTestUtil.waitReplication(fs, parityPath, (short)1);
-//
-// // start one more datanode
-// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
-// DatanodeDescriptor datanode2 = null;
-// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) {
-// if (!d.getName().equals(datanode1.getName())) {
-// datanode2 = d;
-// }
-// }
-// Assert.assertTrue(datanode2 != null);
-// cluster.waitActive();
-// final Path sourcePath = new Path(source);
-// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
-// DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
-//
-// refreshPolicy();
-// Assert.assertEquals(parity,
-// policy.getParityFile(source));
-// Assert.assertEquals(source,
-// policy.getSourceFile(parity, xorPrefix));
-//
-// List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
-// List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
-// Assert.assertEquals(5, sourceBlocks.size());
-// Assert.assertEquals(3, parityBlocks.size());
-//
-// // verify the result of getCompanionBlocks()
-// Collection<LocatedBlock> companionBlocks;
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, sourceBlocks.get(0).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{0, 1}, new int[]{0});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, sourceBlocks.get(1).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{0, 1}, new int[]{0});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, sourceBlocks.get(2).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{2, 3}, new int[]{1});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, sourceBlocks.get(3).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{2, 3}, new int[]{1});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, sourceBlocks.get(4).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{4}, new int[]{2});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, parityBlocks.get(0).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{0, 1}, new int[]{0});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, parityBlocks.get(1).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{2, 3}, new int[]{1});
-//
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, parityBlocks.get(2).getBlock());
-// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
-// new int[]{4}, new int[]{2});
-//
-// // Set the policy back to raid policy. We have to create a new object
-// // here to clear the block location cache
-// refreshPolicy();
-// setBlockPlacementPolicy(namesystem, policy);
-// // verify policy deletes the correct blocks. companion blocks should be
-// // evenly distributed.
-// fs.setReplication(sourcePath, (short)1);
-// DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
-// Map<String, Integer> counters = new HashMap<String, Integer>();
-// refreshPolicy();
-// for (int i = 0; i < parityBlocks.size(); i++) {
-// companionBlocks = getCompanionBlocks(
-// namesystem, policy, parityBlocks.get(i).getBlock());
-//
-// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
-// companionBlocks, false);
-// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
-// counters.get(datanode1.getName()) <= 2);
-// Assert.assertTrue(counters.get(datanode1.getName()) +
-// counters.get(datanode2.getName()) ==
-// companionBlocks.size());
-//
-// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
-// companionBlocks, true);
-// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
-// counters.get(datanode1.getParent().getName()) <= 2);
-// Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
-// counters.get(datanode2.getParent().getName()) ==
-// companionBlocks.size());
-// }
-// } finally {
-// if (cluster != null) {
-// cluster.shutdown();
-// }
-// }
-// }
-//
-// // create a new BlockPlacementPolicyRaid to clear the cache
-// private void refreshPolicy() {
-// policy = new BlockPlacementPolicyRaid();
-// policy.initialize(conf, namesystem, namesystem.clusterMap);
-// }
-//
-// private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
-// List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
-// int[] sourceBlockIndexes, int[] parityBlockIndexes) {
-// Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
-// for (LocatedBlock b : companionBlocks) {
-// blockSet.add(b.getBlock());
-// }
-// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
-// blockSet.size());
-// for (int index : sourceBlockIndexes) {
-// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
-// }
-// for (int index : parityBlockIndexes) {
-// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
-// }
-// }
-//
-// private void verifyCachedFullPathNameResult(
-// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
-// throws IOException {
-// String res1 = inode.getFullPathName();
-// String res2 = cachedFullPathNames.get(inode);
-// LOG.info("Actual path name: " + res1);
-// LOG.info("Cached path name: " + res2);
-// Assert.assertEquals(cachedFullPathNames.get(inode),
-// inode.getFullPathName());
-// }
-//
-// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
-// FSNamesystem namesystem, String file) throws IOException{
-// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
-// List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
-// file, 0L, len, false, false).getLocatedBlocks();
-// List<LocatedBlock> res2 = cachedBlocks.get(file);
-// for (int i = 0; i < res1.size(); i++) {
-// LOG.info("Actual block: " + res1.get(i).getBlock());
-// LOG.info("Cached block: " + res2.get(i).getBlock());
-// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
-// }
-// }
-//
-// private Collection<LocatedBlock> getCompanionBlocks(
-// FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
-// ExtendedBlock block) throws IOException {
-// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block
-// .getLocalBlock());
-// FileType type = policy.getFileType(inode.getFullPathName());
-// return policy.getCompanionBlocks(inode.getFullPathName(), type,
-// block.getLocalBlock());
-// }
-//
-// private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
-// throws IOException {
-// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
-// return NameNodeRaidUtil.getBlockLocations(namesystem,
-// file, 0, len, false, false).getLocatedBlocks();
-// }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java Tue Sep 13 22:49:27 2011
@@ -17,33 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.classification.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.server.blockmanagement.*;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
public class NameNodeRaidTestUtil {
- public static void readLock(final FSDirectory dir) {
+ public static FSInodeInfo[] getFSInodeInfo(final FSNamesystem namesystem,
+ final String... files) throws UnresolvedLinkException {
+ final FSInodeInfo[] inodes = new FSInodeInfo[files.length];
+ final FSDirectory dir = namesystem.dir;
dir.readLock();
+ try {
+ for(int i = 0; i < files.length; i++) {
+ inodes[i] = dir.rootDir.getNode(files[i], true);
+ }
+ return inodes;
+ } finally {
+ dir.readUnlock();
+ }
}
-
- public static void readUnLock(final FSDirectory dir) {
- dir.readUnlock();
- }
-
- public static FSInodeInfo getNode(final FSDirectory dir,
- final String src, final boolean resolveLink
- ) throws UnresolvedLinkException {
- return dir.rootDir.getNode(src, resolveLink);
- }
-
-// public static NavigableMap<String, DatanodeDescriptor> getDatanodeMap(
-// final FSNamesystem namesystem) {
-// return namesystem.datanodeMap;
-// }
}
-
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 13 22:49:27 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/streaming:1159757-1166484
+/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/streaming:1159757-1170371
/hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
/hadoop/core/trunk/src/contrib/streaming:776175-786373