You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC

svn commit: r830230 [4/9] - in /hadoop/mapreduce/branches/HDFS-641: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-sche...

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Tue Oct 27 15:43:58 2009
@@ -23,7 +23,6 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.io.IOException;
-
 import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 
 public class TestContainerQueue extends TestCase {
@@ -72,22 +71,22 @@
     // its children.
     //level 1 children
     QueueSchedulingContext a1 = new QueueSchedulingContext(
-      "a", 25, -1, -1, -1, -1);
+      "a", 25, -1, -1);
     QueueSchedulingContext a2 = new QueueSchedulingContext(
-      "b", 25, -1, -1, -1, -1);
+      "b", 25, -1, -1);
 
     AbstractQueue q = new ContainerQueue(rt, a1);
     AbstractQueue ql = new ContainerQueue(rt, a2);
 
     //level 2 children
     QueueSchedulingContext a = new QueueSchedulingContext(
-      "aa", 50, -1, -1, -1, -1);
+      "aa", 50, -1, -1);
     QueueSchedulingContext b = new QueueSchedulingContext(
-      "ab", 50, -1, -1, -1, -1);
+      "ab", 50, -1, -1);
     QueueSchedulingContext c = new QueueSchedulingContext(
-      "ac", 50, -1, -1, -1, -1);
+      "ac", 50, -1, -1);
     QueueSchedulingContext d = new QueueSchedulingContext(
-      "ad", 50, -1, -1, -1, -1);
+      "ad", 50, -1, -1);
 
     AbstractQueue q1 = new JobQueue(q, a);
     AbstractQueue q2 = new JobQueue(q, b);
@@ -120,27 +119,27 @@
 
   }
 
-  public void testMaxCapacity() throws IOException{
-    this.setUp(4,1,1);
+  public void testMaxCapacity() throws IOException {
+    this.setUp(4, 1, 1);
     taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
 
     AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
 
     QueueSchedulingContext a1 = new QueueSchedulingContext(
-      "R.a", 25, 50, -1, -1, -1);
+      "R.a", 25, 50, -1);
     QueueSchedulingContext a2 = new QueueSchedulingContext(
-      "R.b", 25, 30, -1, -1, -1);
+      "R.b", 25, 30, -1);
     QueueSchedulingContext a3 = new QueueSchedulingContext(
-          "R.c", 50, -1, -1, -1, -1);
+      "R.c", 50, -1, -1);
 
 
     //Test for max capacity
     AbstractQueue q = new JobQueue(rt, a1);
     AbstractQueue q1 = new JobQueue(rt, a2);
     AbstractQueue q2 = new JobQueue(rt, a3);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q1);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q2);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q1);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q2);
 
     scheduler.setRoot(rt);
     rt.update(4, 4);
@@ -148,21 +147,35 @@
     scheduler.updateContextInfoForTests();
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "R.a", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "R.a", "u1");
 
-    //Queue R.a should not more than 2 slots 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
+    //Queue R.a should not more than 2 slots
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP,
       "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE,
+      "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP,
       "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE,
+      "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
 
-    //Now the queue has already reached its max limit no further tasks should
-    // be given.
-    List<Task> l = scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt3"));
-    assertNull(l);
+    assertNull(scheduler.assignTasks(
+      taskTrackerManager.getTaskTracker(
+        "tt3")));
 
   }
 
@@ -172,20 +185,20 @@
 
     //generate Queuecontext for the children
     QueueSchedulingContext a1 = new QueueSchedulingContext(
-      "a", 50, -1, -1, -1, -1);
+      "a", 50, -1, -1);
     QueueSchedulingContext a2 = new QueueSchedulingContext(
-      "b", -1, -1, -1, -1, -1);
+      "b", -1, -1, -1);
 
     AbstractQueue rtChild1 = new ContainerQueue(rt, a1);
     AbstractQueue rtChild2 = new ContainerQueue(rt, a2);
 
     //Add further children to rtChild1.    
     QueueSchedulingContext b = new QueueSchedulingContext(
-      "ab", 30, -1, -1, -1, -1);
+      "ab", 30, -1, -1);
     QueueSchedulingContext c = new QueueSchedulingContext(
-      "ac", -1, -1, -1, -1, -1);
+      "ac", -1, -1, -1);
     QueueSchedulingContext d = new QueueSchedulingContext(
-      "ad", 100, -1, -1, -1, -1);
+      "ad", 100, -1, -1);
 
     AbstractQueue q0 = new JobQueue(rtChild1, b);
     AbstractQueue q1 = new JobQueue(rtChild1, c);
@@ -224,9 +237,9 @@
 
     //Firt level
     QueueSchedulingContext sch =
-      new QueueSchedulingContext("rt.sch", a, -1, -1, -1, -1);
+      new QueueSchedulingContext("rt.sch", a, -1, -1);
     QueueSchedulingContext gta =
-      new QueueSchedulingContext("rt.gta", b, -1, -1, -1, -1);
+      new QueueSchedulingContext("rt.gta", b, -1, -1);
 
     AbstractQueue schq = new ContainerQueue(rt, sch);
 
@@ -238,9 +251,9 @@
 
     //Create further children.
     QueueSchedulingContext prod =
-      new QueueSchedulingContext("rt.sch.prod", c, -1, -1, -1, -1);
+      new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
     QueueSchedulingContext misc =
-      new QueueSchedulingContext("rt.sch.misc", d, -1, -1, -1, -1);
+      new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
 
     AbstractQueue prodq = new JobQueue(schq, prod);
     AbstractQueue miscq = new JobQueue(schq, misc);
@@ -269,86 +282,160 @@
     scheduler.updateContextInfoForTests();
 
     // verify initial capacity distribution
-    TaskSchedulingContext mapTsc 
-        = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
+    TaskSchedulingContext mapTsc
+      = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 3);
-    
+
     mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 5);
-    
+
     mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 4);
 
     mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 1);
 
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 0, 0, 0, 0 });
-    
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 0, 0, 0});
+
     //Only Allow job submission to leaf queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.prod",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 4, 4, "rt.sch.prod",
+      "u1");
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.misc",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 4, 4, "rt.sch.misc",
+      "u1");
 
     //submit a job in gta level queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.gta", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, "rt.gta", "u1");
 
     int counter = 0;
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-                      "attempt_test_0001_m_000001_0 on tt1");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 0, 1, 1, 0 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0003_m_000001_0 on tt2");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 1, 1, 1, 0 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0002_m_000001_0 on tt3");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 1, 2, 1, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0003_m_000002_0 on tt4");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 2, 1, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0001_m_000002_0 on tt5");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 3, 2, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt6",
-      "attempt_test_0001_m_000003_0 on tt6");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 4, 3, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt7",
-      "attempt_test_0003_m_000003_0 on tt7");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 3, 4, 3, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt8",
-      "attempt_test_0001_m_000004_0 on tt8");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 3, 5, 4, 1 });
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 1, 1, 0});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt9",
-      "attempt_test_0002_m_000002_0 on tt9");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 1, 1, 0});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 2, 1, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000002_0 on tt4");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000002_0 on tt4");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 2, 1, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt5");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt5");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 3, 2, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000003_0 on tt6");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000003_0 on tt6");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt6",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 4, 3, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000003_0 on tt7");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000003_0 on tt7");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt7",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{3, 4, 3, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000004_0 on tt8");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000004_0 on tt8");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt8",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{3, 5, 4, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt9");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt9");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt9",
+      expectedStrings);
   }
   
   /**
@@ -373,75 +460,124 @@
     scheduler.updateContextInfoForTests();
 
     // verify capacities as per the setup.
-    TaskSchedulingContext mapTSC 
+    TaskSchedulingContext mapTSC
       = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 2);
-    
+
     mapTSC = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 5);
-    
+
     mapTSC = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 4);
-    
+
     mapTSC = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 1);
-    
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 0, 0, 0, 0 });
+
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 0, 0, 0});
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.sch.misc",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 20, 20, "rt.sch.misc",
+      "u1");
 
     //submit a job in gta level queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.gta", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "rt.gta", "u1");
     int counter = 0;
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-                      "attempt_test_0001_m_000001_0 on tt1");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 0, 1, 0, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 1, 0, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0001_m_000002_0 on tt3");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 2, 0, 2 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0001_m_000003_0 on tt4");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 3, 0, 3 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0002_m_000002_0 on tt5");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 3, 0, 3 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt6",
-      "attempt_test_0001_m_000004_0 on tt6");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 4, 0, 4 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt7",
-      "attempt_test_0001_m_000005_0 on tt7");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 5, 0, 5 });
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 1, 0, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt2");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt8",
-      "attempt_test_0001_m_000006_0 on tt8");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 1, 0, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt3");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt3");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 2, 0, 2});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt4");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt4");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 3, 0, 3});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt5");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt5");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 3, 0, 3});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000004_0 on tt6");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000004_0 on tt6");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt6",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 4, 0, 4});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt7");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt7");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt7",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 5, 0, 5});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000006_0 on tt8");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000006_0 on tt8");
+
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt8",
+      expectedStrings);
   }
 
   // verify that the number of slots used for each queue
@@ -458,24 +594,4 @@
       assertEquals(mapTSC.getNumSlotsOccupied(), expectedUsedSlots[i]);
     }
   }
-
-  private void printOrderedQueueData(AbstractQueue rt) {
-    //print data at all levels.
-    List<AbstractQueue> aq = rt.getChildren();
-    System.out.println();
-    for (AbstractQueue a : aq) {
-      System.out.println(
-        "    // " + a.getName() + "->  data " +
-          a.getQueueSchedulingContext().getMapTSC().getCapacity() + " " +
-          " " +
-          a.getQueueSchedulingContext().getMapTSC().getNumSlotsOccupied());
-      double f = ((double) a.getQueueSchedulingContext().getMapTSC()
-        .getNumSlotsOccupied() /
-        (double) a.getQueueSchedulingContext().getMapTSC().getCapacity());
-      System.out.println("    // rating -> " + f);
-      if (a.getChildren() != null) {
-        printOrderedQueueData(a);
-      }
-    }
-  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java Tue Oct 27 15:43:58 2009
@@ -37,7 +37,7 @@
 import org.apache.hadoop.mapred.CapacityTestUtils.ControlledInitializationPoller;
 import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
 import org.apache.hadoop.mapred.CapacityTestUtils.FakeTaskTrackerManager;
-import static org.apache.hadoop.mapred.CapacityTestUtils.checkAssignment;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 import org.junit.After;
 import org.junit.Test;
 
@@ -104,10 +104,6 @@
       props[i] = queues[i].getProperties();
       props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
           String.valueOf(i + 10));
-      props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
-          String.valueOf(i + 20));
-      props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
-          String.valueOf(i + 25));
       props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
           String.valueOf(i + 15));
       props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -136,8 +132,6 @@
           allQueues.get(qName).getQueueSchedulingContext();
       LOG.info("Context for queue " + qName + " is : " + qsc);
       assertEquals(i + 10, qsc.getCapacityPercent(), 0);
-      assertEquals(i + 20, qsc.getMapTSC().getMaxTaskLimit());
-      assertEquals(i + 25, qsc.getReduceTSC().getMaxTaskLimit());
       assertEquals(i + 15, qsc.getMaxCapacityPercent(), 0);
       assertEquals(Boolean.valueOf(false),
           Boolean.valueOf(qsc.supportsPriorities()));
@@ -149,10 +143,6 @@
       props[i] = queues[i].getProperties();
       props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
           String.valueOf(i + 20));
-      props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
-          String.valueOf(i + 30));
-      props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
-          String.valueOf(i + 35));
       props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
           String.valueOf(i + 25));
       props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -183,8 +173,6 @@
       assertEquals(qName, qsc.getQueueName());
       LOG.info("Context for queue " + qName + " is : " + qsc);
       assertEquals(i + 20, qsc.getCapacityPercent(), 0);
-      assertEquals(i + 30, qsc.getMapTSC().getMaxTaskLimit());
-      assertEquals(i + 35, qsc.getReduceTSC().getMaxTaskLimit());
       assertEquals(i + 25, qsc.getMaxCapacityPercent(), 0);
       assertEquals(Boolean.valueOf(false),
           Boolean.valueOf(qsc.supportsPriorities()));
@@ -201,65 +189,111 @@
     JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
 
     queues[0].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
 
     // write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
     setupAndStartSchedulerFramework(2, 2, 2);
 
     FakeJobInProgress job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user");
     FakeJobInProgress job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user");
-
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0002_m_000002_0 on tt2");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0001_m_000002_0 on tt2");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user");
+
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+//===========================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
 
     taskTrackerManager.killJob(job1.getJobID());
     taskTrackerManager.killJob(job2.getJobID());
 
     // change configuration
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
 
     // Re-write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
-    taskTrackerManager.getQueueManager().refreshQueues(null,
-        scheduler.getQueueRefresher());
+    taskTrackerManager.getQueueManager().refreshQueues(
+      null,
+      scheduler.getQueueRefresher());
 
     job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user");
     job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0,
-            queues[2].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 4, 4,
+        queues[2].getQueueName(), "user");
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000003_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000003_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0004_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0004_m_000002_0 on tt2");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0004_m_000003_0 on tt2");
   }
 
   /**
@@ -336,60 +370,84 @@
     JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
 
     queues[0].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
 
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
-        String.valueOf(100));
+      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+      String.valueOf(100));
 
     // write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
     setupAndStartSchedulerFramework(1, 2, 2);
 
     FakeJobInProgress job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user1");
     FakeJobInProgress job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user2");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user2");
+
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+    
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000002_0 on tt1");
     assertNull(scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt1")));
     taskTrackerManager.killJob(job1.getJobID());
     taskTrackerManager.killJob(job2.getJobID());
 
     // change configuration
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
-        String.valueOf(50));
+      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+      String.valueOf(50));
 
     // Re-write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
-    taskTrackerManager.getQueueManager().refreshQueues(null,
-        scheduler.getQueueRefresher());
+    taskTrackerManager.getQueueManager().refreshQueues(
+      null,
+      scheduler.getQueueRefresher());
 
     job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user1");
     job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user2");
-
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0004_m_000001_0 on tt1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user2");
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
   }
 
   /**

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
 /hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/data_join:817878-830225

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
 /hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-830225

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
 /hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-830225

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
 /hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-830225

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml Tue Oct 27 15:43:58 2009
@@ -54,5 +54,21 @@
       name="paranamer-ant"
       rev="${paranamer.version}"
       conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-api-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Tue Oct 27 15:43:58 2009
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -118,7 +120,12 @@
     }
     // Print out the normal response
     response.setContentType("text/html");
-    PrintWriter out = new PrintWriter(response.getOutputStream());
+
+    // Because the client may read arbitrarily slow, and we hold locks while
+    // the servlet outputs, we want to write to our own buffer which we know
+    // won't block.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter out = new PrintWriter(baos);
     String hostname = StringUtils.simpleHostname(
         jobTracker.getJobTrackerMachine());
     out.print("<html><head>");
@@ -132,6 +139,11 @@
     showJobs(out, advancedView);
     out.print("</body></html>\n");
     out.close();
+
+    // Flush our buffer to the real servlet output
+    OutputStream servletOut = response.getOutputStream();
+    baos.writeTo(servletOut);
+    servletOut.close();
   }
 
   /**
@@ -202,55 +214,57 @@
     out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
         (advancedView ? "<th>Weight</th>" : ""));
     out.print("</tr>\n");
-    Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
-    synchronized (scheduler) {
-      for (JobInProgress job: runningJobs) {
-        JobProfile profile = job.getProfile();
-        JobInfo info = scheduler.infos.get(job);
-        if (info == null) { // Job finished, but let's show 0's for info
-          info = new JobInfo(null, null);
-        }
-        out.print("<tr>\n");
-        out.printf("<td>%s</td>\n", DATE_FORMAT.format(
-            new Date(job.getStartTime())));
-        out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
-            profile.getJobID(), profile.getJobID());
-        out.printf("<td>%s</td>\n", profile.getUser());
-        out.printf("<td>%s</td>\n", profile.getJobName());
-        if (JSPUtil.privateActionsAllowed()) {
-          out.printf("<td>%s</td>\n", generateSelect(scheduler
-              .getPoolManager().getPoolNames(), scheduler.getPoolManager()
-              .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
-              + profile.getJobID() + (advancedView ? "&advanced" : "")));
-          out.printf("<td>%s</td>\n", generateSelect(Arrays
-              .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
-                  "VERY_HIGH" }), job.getPriority().toString(),
-              "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
-                  + (advancedView ? "&advanced" : "")));
-        } else {
-          out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
-          out.printf("<td>%s</td>\n", job.getPriority().toString());
-        }
-        Pool pool = scheduler.getPoolManager().getPool(job);
-        String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
-            String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
-        out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
-            job.finishedMaps(), job.desiredMaps(), 
-            info.mapSchedulable.getRunningTasks(),
-            mapShare);
-        if (advancedView) {
-          out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
-        }
-        String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
-            String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
-        out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
-            job.finishedReduces(), job.desiredReduces(), 
-            info.reduceSchedulable.getRunningTasks(),
-            reduceShare);
-        if (advancedView) {
-          out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+    synchronized (jobTracker) {
+      Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+      synchronized (scheduler) {
+        for (JobInProgress job: runningJobs) {
+          JobProfile profile = job.getProfile();
+          JobInfo info = scheduler.infos.get(job);
+          if (info == null) { // Job finished, but let's show 0's for info
+            info = new JobInfo(null, null);
+          }
+          out.print("<tr>\n");
+          out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+              new Date(job.getStartTime())));
+          out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+              profile.getJobID(), profile.getJobID());
+          out.printf("<td>%s</td>\n", profile.getUser());
+          out.printf("<td>%s</td>\n", profile.getJobName());
+          if (JSPUtil.privateActionsAllowed()) {
+            out.printf("<td>%s</td>\n", generateSelect(scheduler
+                .getPoolManager().getPoolNames(), scheduler.getPoolManager()
+                .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
+                + profile.getJobID() + (advancedView ? "&advanced" : "")));
+            out.printf("<td>%s</td>\n", generateSelect(Arrays
+                .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
+                    "VERY_HIGH" }), job.getPriority().toString(),
+                "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
+                    + (advancedView ? "&advanced" : "")));
+          } else {
+            out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
+            out.printf("<td>%s</td>\n", job.getPriority().toString());
+          }
+          Pool pool = scheduler.getPoolManager().getPool(job);
+          String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+              String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+          out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+              job.finishedMaps(), job.desiredMaps(), 
+              info.mapSchedulable.getRunningTasks(),
+              mapShare);
+          if (advancedView) {
+            out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
+          }
+          String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+              String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+          out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+              job.finishedReduces(), job.desiredReduces(), 
+              info.reduceSchedulable.getRunningTasks(),
+              reduceShare);
+          if (advancedView) {
+            out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+          }
+          out.print("</tr>\n");
         }
-        out.print("</tr>\n");
       }
     }
     out.print("</table>\n");

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Oct 27 15:43:58 2009
@@ -64,8 +64,8 @@
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
-        String[][] mapInputLocations) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        String[][] mapInputLocations, JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.mapInputLocations = mapInputLocations;
       this.startTime = System.currentTimeMillis();
@@ -536,7 +536,7 @@
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
-        mapInputLocations);
+        mapInputLocations, UtilsForTests.getJobTracker());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Oct 27 15:43:58 2009
@@ -200,10 +200,7 @@
         }
         // scan input dir contents
         submitter.refreshFilePool();
-      } catch (IOException e) {
-        LOG.error("Startup failed", e);
-        if (factory != null) factory.abort(); // abort pipeline
-      } catch (InterruptedException e) {
+      } catch (Throwable e) {
         LOG.error("Startup failed", e);
         if (factory != null) factory.abort(); // abort pipeline
       } finally {
@@ -214,8 +211,10 @@
       if (factory != null) {
         // wait for input exhaustion
         factory.join();
-        if (null != factory.error()) {
-          throw factory.error();
+        final Throwable badTraceException = factory.error();
+        if (null != badTraceException) {
+          LOG.error("Error in trace", badTraceException);
+          throw new IOException("Error in trace", badTraceException);
         }
         // wait for pending tasks to be submitted
         submitter.shutdown();

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Tue Oct 27 15:43:58 2009
@@ -530,6 +530,16 @@
         }
       }
     }
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      while (written < outBytes) {
+        final int len = (int) Math.min(outBytes - written, val.getCapacity());
+        fillBytes(val, len);
+        context.write(NullWritable.get(), val);
+        written += len;
+      }
+    }
   }
 
   static class GridmixRecordReader

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Tue Oct 27 15:43:58 2009
@@ -112,13 +112,13 @@
 
     public MockJob(Configuration conf) {
       this(conf.getInt(MIN_BYTES_IN, 1 << 20),
-           conf.getInt(VAR_BYTES_IN, 10 << 20),
+           conf.getInt(VAR_BYTES_IN, 5 << 20),
            conf.getInt(MIN_BYTES_OUT, 1 << 20),
-           conf.getInt(VAR_BYTES_OUT, 10 << 20),
+           conf.getInt(VAR_BYTES_OUT, 5 << 20),
            conf.getInt(MIN_REC_SIZE , 100),
            conf.getInt(VAR_REC_SIZE , 1 << 15),
-           conf.getInt(MAX_MAPS, 6),
-           conf.getInt(MAX_REDS, 4));
+           conf.getInt(MAX_MAPS, 5),
+           conf.getInt(MAX_REDS, 3));
     }
 
     public MockJob(int min_bytes_in, int var_bytes_in,
@@ -126,7 +126,7 @@
                    int min_rec_size, int var_rec_size,
                    int max_maps, int max_reds) {
       final Random r = new Random();
-      name = String.format("MOCKJOB%04d", seq.getAndIncrement());
+      name = String.format("MOCKJOB%05d", seq.getAndIncrement());
       submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
             r.nextInt(10), TimeUnit.SECONDS));
       int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Oct 27 15:43:58 2009
@@ -19,6 +19,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -30,12 +33,20 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapreduce.TaskCounter.*;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -64,9 +75,13 @@
   public static void initCluster() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+    conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
+    conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+    conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
     dfsCluster = new MiniDFSCluster(conf, 3, true, null);
     dfs = dfsCluster.getFileSystem();
-    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1);
+    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+        new JobConf(conf));
   }
 
   @AfterClass
@@ -81,6 +96,7 @@
 
   static class TestMonitor extends JobMonitor {
 
+    static final long SLOPBYTES = 5 * 1024;
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
@@ -90,9 +106,142 @@
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
 
-    public void verify(ArrayList<JobStory> submitted) {
+    public void verify(ArrayList<JobStory> submitted) throws Exception {
       final ArrayList<Job> succeeded = new ArrayList<Job>();
       assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+      final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+      for (JobStory spec : submitted) {
+        sub.put(spec.getName(), spec);
+      }
+      for (Job job : succeeded) {
+        final String jobname = job.getJobName();
+        if ("GRIDMIX_GENDATA".equals(jobname)) {
+          final Path in = new Path("foo").makeQualified(dfs);
+          final Path out = new Path("/gridmix").makeQualified(dfs);
+          final ContentSummary generated = dfs.getContentSummary(in);
+          assertTrue("Mismatched data gen", // +/- 100k for logs
+              (GENDATA << 20) < generated.getLength() + GENSLOP ||
+              (GENDATA << 20) > generated.getLength() - GENSLOP);
+          FileStatus[] outstat = dfs.listStatus(out);
+          assertEquals("Mismatched job count", NJOBS, outstat.length);
+          continue;
+        }
+        final JobStory spec =
+          sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+        assertNotNull("No spec for " + job.getJobName(), spec);
+        assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+        final int nMaps = spec.getNumberMaps();
+        final int nReds = spec.getNumberReduces();
+
+        System.out.println(jobname + ": " + nMaps + "/" + nReds);
+        final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
+        assertEquals("Mismatched map count", nMaps, mReports.length);
+        check(TaskType.MAP, job, spec, mReports,
+            0, 1, nReds * SLOPBYTES, nReds + 1);
+
+        final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
+        assertEquals("Mismatched reduce count", nReds, rReports.length);
+        check(TaskType.REDUCE, job, spec, rReports,
+            nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+      }
+    }
+
+    public void check(final TaskType type, Job job, JobStory spec,
+          final TaskReport[] runTasks,
+          long extraInputBytes, int extraInputRecords,
+          long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+      long[] runInputRecords = new long[runTasks.length];
+      long[] runInputBytes = new long[runTasks.length];
+      long[] runOutputRecords = new long[runTasks.length];
+      long[] runOutputBytes = new long[runTasks.length];
+      long[] specInputRecords = new long[runTasks.length];
+      long[] specInputBytes = new long[runTasks.length];
+      long[] specOutputRecords = new long[runTasks.length];
+      long[] specOutputBytes = new long[runTasks.length];
+
+      for (int i = 0; i < runTasks.length; ++i) {
+        final TaskInfo specInfo;
+        final Counters counters = runTasks[i].getTaskCounters();
+        switch (type) {
+          case MAP:
+             runInputBytes[i] = counters.findCounter("FileSystemCounters",
+                 "HDFS_BYTES_READ").getValue();
+             runInputRecords[i] =
+               (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+             runOutputBytes[i] =
+               counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+             runOutputRecords[i] =
+               (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            break;
+          case REDUCE:
+             runInputBytes[i] =
+               counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
+             runInputRecords[i] =
+               (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+             runOutputBytes[i] =
+               counters.findCounter("FileSystemCounters",
+                   "HDFS_BYTES_WRITTEN").getValue();
+             runOutputRecords[i] =
+               (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            break;
+          default:
+            specInfo = null;
+            fail("Unexpected type: " + type);
+        }
+        specInputBytes[i] = specInfo.getInputBytes();
+        specInputRecords[i] = specInfo.getInputRecords();
+        specOutputRecords[i] = specInfo.getOutputRecords();
+        specOutputBytes[i] = specInfo.getOutputBytes();
+        System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+             specInputBytes[i], specOutputBytes[i],
+             specInputRecords[i], specOutputRecords[i]);
+        System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+             runInputBytes[i], runOutputBytes[i],
+             runInputRecords[i], runOutputRecords[i]);
+      }
+
+      // Check input bytes
+      Arrays.sort(specInputBytes);
+      Arrays.sort(runInputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched input bytes " +
+            specInputBytes[i] + "/" + runInputBytes[i],
+            runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+      }
+
+      // Check input records
+      Arrays.sort(specInputRecords);
+      Arrays.sort(runInputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched input records " +
+            specInputRecords[i] + "/" + runInputRecords[i],
+            runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+      }
+
+      // Check output bytes
+      Arrays.sort(specOutputBytes);
+      Arrays.sort(runOutputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched output bytes " +
+            specOutputBytes[i] + "/" + runOutputBytes[i],
+            runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+      }
+
+      // Check output records
+      Arrays.sort(specOutputRecords);
+      Arrays.sort(runOutputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched output records " +
+            specOutputRecords[i] + "/" + runOutputRecords[i],
+            runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+      }
+
     }
 
     @Override
@@ -110,7 +259,7 @@
     private DebugJobFactory factory;
     private TestMonitor monitor;
 
-    public void checkMonitor() {
+    public void checkMonitor() throws Exception {
       monitor.verify(factory.getSubmitted());
     }
 
@@ -146,12 +295,6 @@
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();
-    final ContentSummary generated = dfs.getContentSummary(in);
-    assertTrue("Mismatched data gen", // +/- 100k for logs
-        (GENDATA << 20) < generated.getLength() + GENSLOP ||
-        (GENDATA << 20) > generated.getLength() - GENSLOP);
-    FileStatus[] outstat = dfs.listStatus(out);
-    assertEquals("Mismatched job count", NJOBS, outstat.length);
   }
 
 }

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
 /hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/index:817878-830225

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
 /hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-830225

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
 /hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-830225

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml Tue Oct 27 15:43:58 2009
@@ -152,4 +152,12 @@
     <fail if="tests.failed">Tests failed!</fail>
   </target>
 
+  <target name="doc">
+    <exec executable="make" failonerror="true">
+      <arg value="-C" />
+      <arg value="${basedir}/doc" />
+      <arg value="BUILDROOT=${build.dir}" />
+    </exec>
+  </target>
+
 </project>

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Oct 27 15:43:58 2009
@@ -0,0 +1,3 @@
+Sqoop-manpage.xml
+sqoop.1
+Sqoop-web.html

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml Tue Oct 27 15:43:58 2009
@@ -48,6 +48,10 @@
       name="commons-httpclient"
       rev="${commons-httpclient.version}"
       conf="common->default"/>
+    <dependency org="commons-io"
+      name="commons-io"
+      rev="${commons-io.version}"
+      conf="common->default"/>
     <dependency org="commons-cli"
       name="commons-cli"
       rev="${commons-cli.version}"

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Tue Oct 27 15:43:58 2009
@@ -100,6 +100,8 @@
   private String packageName; // package to prepend to auto-named classes.
   private String className; // package+class to apply to individual table import.
   private int numMappers;
+  private boolean useCompression;
+  private long directSplitSize; // In direct mode, open a new stream every X bytes.
 
   private char inputFieldDelim;
   private char inputRecordDelim;
@@ -136,6 +138,23 @@
     this.tableName = table;
   }
 
+  private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+    String str = props.getProperty(propName,
+        Boolean.toString(defaultValue)).toLowerCase();
+    return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+  }
+
+  private long getLongProperty(Properties props, String propName, long defaultValue) {
+    String str = props.getProperty(propName,
+        Long.toString(defaultValue)).toLowerCase();
+    try {
+      return Long.parseLong(str);
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Could not parse integer value for config parameter " + propName);
+      return defaultValue;
+    }
+  }
+
   private void loadFromProperties() {
     File configFile = new File(DEFAULT_CONFIG_FILE);
     if (!configFile.canRead()) {
@@ -164,15 +183,11 @@
       this.className = props.getProperty("java.classname", this.className);
       this.packageName = props.getProperty("java.packagename", this.packageName);
 
-      String directImport = props.getProperty("direct.import",
-          Boolean.toString(this.direct)).toLowerCase();
-      this.direct = "true".equals(directImport) || "yes".equals(directImport)
-          || "1".equals(directImport);
-
-      String hiveImportStr = props.getProperty("hive.import",
-          Boolean.toString(this.hiveImport)).toLowerCase();
-      this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
-          || "1".equals(hiveImportStr);
+      this.direct = getBooleanProperty(props, "direct.import", this.direct);
+      this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+      this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+      this.directSplitSize = getLongProperty(props, "direct.split.size",
+          this.directSplitSize);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -231,6 +246,8 @@
     this.areDelimsManuallySet = false;
 
     this.numMappers = DEFAULT_NUM_MAPPERS;
+    this.useCompression = false;
+    this.directSplitSize = 0;
 
     loadFromProperties();
   }
@@ -272,6 +289,9 @@
     System.out.println("--hive-import                If set, then import the table into Hive.");
     System.out.println("                    (Uses Hive's default delimiters if none are set.)");
     System.out.println("-m, --num-mappers (n)        Use 'n' map tasks to import in parallel");
+    System.out.println("-z, --compress               Enable compression");
+    System.out.println("--direct-split-size (n)      Split the input stream every 'n' bytes");
+    System.out.println("                             when importing in direct mode.");
     System.out.println("");
     System.out.println("Output line formatting options:");
     System.out.println("--fields-terminated-by (char)    Sets the field separator character");
@@ -437,7 +457,8 @@
             this.password = "";
           }
         } else if (args[i].equals("--password")) {
-          LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+          LOG.warn("Setting your password on the command-line is insecure. "
+              + "Consider using -P instead.");
           this.password = args[++i];
         } else if (args[i].equals("-P")) {
           this.password = securePasswordEntry();
@@ -449,12 +470,7 @@
           this.hiveImport = true;
         } else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
           String numMappersStr = args[++i];
-          try {
-            this.numMappers = Integer.valueOf(numMappersStr);
-          } catch (NumberFormatException nfe) {
-            throw new InvalidOptionsException("Invalid argument; expected "
-                + args[i - 1] + " (number).");
-          }
+          this.numMappers = Integer.valueOf(numMappersStr);
         } else if (args[i].equals("--fields-terminated-by")) {
           this.outputFieldDelim = ImportOptions.toChar(args[++i]);
           this.areDelimsManuallySet = true;
@@ -505,6 +521,10 @@
           this.packageName = args[++i];
         } else if (args[i].equals("--class-name")) {
           this.className = args[++i];
+        } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+          this.useCompression = true;
+        } else if (args[i].equals("--direct-split-size")) {
+          this.directSplitSize = Long.parseLong(args[++i]);
         } else if (args[i].equals("--list-databases")) {
           this.action = ControlAction.ListDatabases;
         } else if (args[i].equals("--generate-only")) {
@@ -529,6 +549,9 @@
     } catch (ArrayIndexOutOfBoundsException oob) {
       throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
           + "Try --help for usage.");
+    } catch (NumberFormatException nfe) {
+      throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+          + "Try --help for usage.");
     }
   }
 
@@ -832,4 +855,18 @@
   public boolean isOutputEncloseRequired() {
     return this.outputMustBeEnclosed;
   }
+
+  /**
+   * @return true if the user wants imported results to be compressed.
+   */
+  public boolean shouldUseCompression() {
+    return this.useCompression;
+  }
+
+  /**
+   * @return the file size to split by when using --direct mode.
+   */
+  public long getDirectSplitSize() {
+    return this.directSplitSize;
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Tue Oct 27 15:43:58 2009
@@ -39,6 +39,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.ImportError;
@@ -64,11 +65,11 @@
       char to each record.
     */
   static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
     private final ImportOptions options;
 
-    PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+    PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
@@ -94,14 +95,14 @@
     private static class PostgresqlStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final ImportOptions options;
       private final PerfCounters counters;
 
       private boolean error;
 
-      PostgresqlStreamThread(final InputStream is, final BufferedWriter w,
+      PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.stream = is;
         this.writer = w;
@@ -115,7 +116,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         char recordDelim = this.options.getOutputRecordDelim();
 
@@ -131,6 +132,7 @@
 
             w.write(inLine);
             w.write(recordDelim);
+            w.allowSplit();
             counters.addBytes(1 + inLine.length());
           }
         } catch (IOException ioe) {
@@ -394,8 +396,7 @@
       }
 
       // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
-      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
 
       // Actually start the psql dump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Tue Oct 27 15:43:58 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
@@ -64,10 +65,10 @@
    * header and footer characters that are attached to each line in mysqldump.
    */
   static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
 
-    CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+    CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
       this.writer = w;
       this.counters = ctrs;
     }
@@ -91,13 +92,14 @@
     private static class CopyingStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final PerfCounters counters;
 
       private boolean error;
 
-      CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+      CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
+          final PerfCounters ctrs) {
         this.writer = w;
         this.stream = is;
         this.counters = ctrs;
@@ -109,7 +111,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -169,11 +171,11 @@
    * output, and re-emit the text in the user's specified output format.
    */
   static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final ImportOptions options;
     private final PerfCounters counters;
 
-    ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts, 
+    ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
@@ -199,14 +201,14 @@
     private static class ReparsingStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final ImportOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
       private boolean error;
 
-      ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+      ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
@@ -234,7 +236,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -292,6 +294,7 @@
             }
 
             w.write(outputRecordDelim);
+            w.allowSplit();
             counters.addBytes(recordLen);
           }
         } catch (IOException ioe) {
@@ -444,8 +447,7 @@
       }
 
       // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
-      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
 
       // Actually start the mysqldump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]));

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Tue Oct 27 15:43:58 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -104,10 +105,16 @@
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
+        if (options.shouldUseCompression()) {
+          FileOutputFormat.setCompressOutput(job, true);
+          FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
       } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
         job.setOutputFormat(SequenceFileOutputFormat.class);
-        SequenceFileOutputFormat.setCompressOutput(job, true);
-        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        if (options.shouldUseCompression()) {
+          SequenceFileOutputFormat.setCompressOutput(job, true);
+          SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        }
         job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
       } else {
         LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java Tue Oct 27 15:43:58 2009
@@ -68,6 +68,7 @@
   private class ProgressThread extends Thread {
 
     private volatile boolean keepGoing; // while this is true, thread runs.
+
     private Context context;
     private long startTimeMillis;
     private long lastReportMillis;

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java Tue Oct 27 15:43:58 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -104,11 +105,17 @@
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
+        if (options.shouldUseCompression()) {
+          FileOutputFormat.setCompressOutput(job, true);
+          FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
       } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setMapperClass(AutoProgressMapper.class);
-        SequenceFileOutputFormat.setCompressOutput(job, true);
-        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        if (options.shouldUseCompression()) {
+          SequenceFileOutputFormat.setCompressOutput(job, true);
+          SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        }
         job.getConfiguration().set("mapred.output.value.class", tableClassName);
       } else {
         LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Tue Oct 27 15:43:58 2009
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.File;
-import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,8 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittingOutputStream;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.util.Shell;
 
 /**
@@ -64,8 +65,8 @@
    * The caller is responsible for calling the close() method on the returned
    * stream.
    */
-  public static OutputStream createHdfsSink(Configuration conf, ImportOptions options,
-      String tableName) throws IOException {
+  public static SplittableBufferedWriter createHdfsSink(Configuration conf,
+      ImportOptions options, String tableName) throws IOException {
 
     FileSystem fs = FileSystem.get(conf);
     String warehouseDir = options.getWarehouseDir();
@@ -79,15 +80,11 @@
     LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
     LOG.debug("Creating destination directory " + destDir);
     fs.mkdirs(destDir);
-    Path destFile = new Path(destDir, "data-00000");
-    LOG.debug("Opening output file: " + destFile);
-    if (fs.exists(destFile)) {
-      Path canonicalDest = destFile.makeQualified(fs);
-      throw new IOException("Destination file " + canonicalDest + " already exists");
-    }
 
-    // This OutputStream will be clsoed by the caller.
-    return fs.create(destFile);
+    // This Writer will be closed by the caller.
+    return new SplittableBufferedWriter(
+        new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
+        options.shouldUseCompression()));
   }
 }