You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC
svn commit: r830230 [4/9] - in /hadoop/mapreduce/branches/HDFS-641: ./
.eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/
src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-sche...
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Tue Oct 27 15:43:58 2009
@@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
-
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
public class TestContainerQueue extends TestCase {
@@ -72,22 +71,22 @@
// its children.
//level 1 children
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "a", 25, -1, -1, -1, -1);
+ "a", 25, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "b", 25, -1, -1, -1, -1);
+ "b", 25, -1, -1);
AbstractQueue q = new ContainerQueue(rt, a1);
AbstractQueue ql = new ContainerQueue(rt, a2);
//level 2 children
QueueSchedulingContext a = new QueueSchedulingContext(
- "aa", 50, -1, -1, -1, -1);
+ "aa", 50, -1, -1);
QueueSchedulingContext b = new QueueSchedulingContext(
- "ab", 50, -1, -1, -1, -1);
+ "ab", 50, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
- "ac", 50, -1, -1, -1, -1);
+ "ac", 50, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
- "ad", 50, -1, -1, -1, -1);
+ "ad", 50, -1, -1);
AbstractQueue q1 = new JobQueue(q, a);
AbstractQueue q2 = new JobQueue(q, b);
@@ -120,27 +119,27 @@
}
- public void testMaxCapacity() throws IOException{
- this.setUp(4,1,1);
+ public void testMaxCapacity() throws IOException {
+ this.setUp(4, 1, 1);
taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "R.a", 25, 50, -1, -1, -1);
+ "R.a", 25, 50, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "R.b", 25, 30, -1, -1, -1);
+ "R.b", 25, 30, -1);
QueueSchedulingContext a3 = new QueueSchedulingContext(
- "R.c", 50, -1, -1, -1, -1);
+ "R.c", 50, -1, -1);
//Test for max capacity
AbstractQueue q = new JobQueue(rt, a1);
AbstractQueue q1 = new JobQueue(rt, a2);
AbstractQueue q2 = new JobQueue(rt, a3);
- scheduler.jobQueuesManager.addQueue((JobQueue)q);
- scheduler.jobQueuesManager.addQueue((JobQueue)q1);
- scheduler.jobQueuesManager.addQueue((JobQueue)q2);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q1);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q2);
scheduler.setRoot(rt);
rt.update(4, 4);
@@ -148,21 +147,35 @@
scheduler.updateContextInfoForTests();
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "R.a", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "R.a", "u1");
- //Queue R.a should not more than 2 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
+ //Queue R.a should not more than 2 slots
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
- //Now the queue has already reached its max limit no further tasks should
- // be given.
- List<Task> l = scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt3"));
- assertNull(l);
+ assertNull(scheduler.assignTasks(
+ taskTrackerManager.getTaskTracker(
+ "tt3")));
}
@@ -172,20 +185,20 @@
//generate Queuecontext for the children
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "a", 50, -1, -1, -1, -1);
+ "a", 50, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "b", -1, -1, -1, -1, -1);
+ "b", -1, -1, -1);
AbstractQueue rtChild1 = new ContainerQueue(rt, a1);
AbstractQueue rtChild2 = new ContainerQueue(rt, a2);
//Add further children to rtChild1.
QueueSchedulingContext b = new QueueSchedulingContext(
- "ab", 30, -1, -1, -1, -1);
+ "ab", 30, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
- "ac", -1, -1, -1, -1, -1);
+ "ac", -1, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
- "ad", 100, -1, -1, -1, -1);
+ "ad", 100, -1, -1);
AbstractQueue q0 = new JobQueue(rtChild1, b);
AbstractQueue q1 = new JobQueue(rtChild1, c);
@@ -224,9 +237,9 @@
//Firt level
QueueSchedulingContext sch =
- new QueueSchedulingContext("rt.sch", a, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch", a, -1, -1);
QueueSchedulingContext gta =
- new QueueSchedulingContext("rt.gta", b, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.gta", b, -1, -1);
AbstractQueue schq = new ContainerQueue(rt, sch);
@@ -238,9 +251,9 @@
//Create further children.
QueueSchedulingContext prod =
- new QueueSchedulingContext("rt.sch.prod", c, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
QueueSchedulingContext misc =
- new QueueSchedulingContext("rt.sch.misc", d, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
AbstractQueue prodq = new JobQueue(schq, prod);
AbstractQueue miscq = new JobQueue(schq, misc);
@@ -269,86 +282,160 @@
scheduler.updateContextInfoForTests();
// verify initial capacity distribution
- TaskSchedulingContext mapTsc
- = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
+ TaskSchedulingContext mapTsc
+ = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 3);
-
+
mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 5);
-
+
mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 4);
mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 1);
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 0, 0, 0, 0 });
-
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 0, 0, 0});
+
//Only Allow job submission to leaf queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.prod",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4, "rt.sch.prod",
+ "u1");
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.misc",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4, "rt.sch.misc",
+ "u1");
//submit a job in gta level queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.gta", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, "rt.gta", "u1");
int counter = 0;
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 0, 1, 1, 0 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0003_m_000001_0 on tt2");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 1, 1, 1, 0 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt3",
- "attempt_test_0002_m_000001_0 on tt3");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 1, 2, 1, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt4",
- "attempt_test_0003_m_000002_0 on tt4");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 2, 1, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt5",
- "attempt_test_0001_m_000002_0 on tt5");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 3, 2, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt6",
- "attempt_test_0001_m_000003_0 on tt6");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 4, 3, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt7",
- "attempt_test_0003_m_000003_0 on tt7");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 3, 4, 3, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt8",
- "attempt_test_0001_m_000004_0 on tt8");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 3, 5, 4, 1 });
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 1, 1, 0});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt9",
- "attempt_test_0002_m_000002_0 on tt9");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 1, 1, 0});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 2, 1, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000002_0 on tt4");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000002_0 on tt4");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 2, 1, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt5");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt5");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 3, 2, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000003_0 on tt6");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000003_0 on tt6");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt6",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 4, 3, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000003_0 on tt7");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000003_0 on tt7");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt7",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{3, 4, 3, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000004_0 on tt8");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000004_0 on tt8");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt8",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{3, 5, 4, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt9");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt9");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt9",
+ expectedStrings);
}
/**
@@ -373,75 +460,124 @@
scheduler.updateContextInfoForTests();
// verify capacities as per the setup.
- TaskSchedulingContext mapTSC
+ TaskSchedulingContext mapTSC
= map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 2);
-
+
mapTSC = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 5);
-
+
mapTSC = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 4);
-
+
mapTSC = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 1);
-
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 0, 0, 0, 0 });
+
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 0, 0, 0});
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.sch.misc",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 20, 20, "rt.sch.misc",
+ "u1");
//submit a job in gta level queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.gta", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "rt.gta", "u1");
int counter = 0;
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 0, 1, 0, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 1, 0, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_m_000002_0 on tt3");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 2, 0, 2 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000003_0 on tt4");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 3, 0, 3 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt5",
- "attempt_test_0002_m_000002_0 on tt5");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 3, 0, 3 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt6",
- "attempt_test_0001_m_000004_0 on tt6");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 4, 0, 4 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt7",
- "attempt_test_0001_m_000005_0 on tt7");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 5, 0, 5 });
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 1, 0, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt8",
- "attempt_test_0001_m_000006_0 on tt8");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 1, 0, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt3");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt3");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 2, 0, 2});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt4");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt4");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 3, 0, 3});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt5");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt5");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 3, 0, 3});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000004_0 on tt6");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000004_0 on tt6");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt6",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 4, 0, 4});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt7");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt7");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt7",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 5, 0, 5});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000006_0 on tt8");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000006_0 on tt8");
+
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt8",
+ expectedStrings);
}
// verify that the number of slots used for each queue
@@ -458,24 +594,4 @@
assertEquals(mapTSC.getNumSlotsOccupied(), expectedUsedSlots[i]);
}
}
-
- private void printOrderedQueueData(AbstractQueue rt) {
- //print data at all levels.
- List<AbstractQueue> aq = rt.getChildren();
- System.out.println();
- for (AbstractQueue a : aq) {
- System.out.println(
- " // " + a.getName() + "-> data " +
- a.getQueueSchedulingContext().getMapTSC().getCapacity() + " " +
- " " +
- a.getQueueSchedulingContext().getMapTSC().getNumSlotsOccupied());
- double f = ((double) a.getQueueSchedulingContext().getMapTSC()
- .getNumSlotsOccupied() /
- (double) a.getQueueSchedulingContext().getMapTSC().getCapacity());
- System.out.println(" // rating -> " + f);
- if (a.getChildren() != null) {
- printOrderedQueueData(a);
- }
- }
- }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java Tue Oct 27 15:43:58 2009
@@ -37,7 +37,7 @@
import org.apache.hadoop.mapred.CapacityTestUtils.ControlledInitializationPoller;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeTaskTrackerManager;
-import static org.apache.hadoop.mapred.CapacityTestUtils.checkAssignment;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
import org.junit.After;
import org.junit.Test;
@@ -104,10 +104,6 @@
props[i] = queues[i].getProperties();
props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(i + 10));
- props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
- String.valueOf(i + 20));
- props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
- String.valueOf(i + 25));
props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
String.valueOf(i + 15));
props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -136,8 +132,6 @@
allQueues.get(qName).getQueueSchedulingContext();
LOG.info("Context for queue " + qName + " is : " + qsc);
assertEquals(i + 10, qsc.getCapacityPercent(), 0);
- assertEquals(i + 20, qsc.getMapTSC().getMaxTaskLimit());
- assertEquals(i + 25, qsc.getReduceTSC().getMaxTaskLimit());
assertEquals(i + 15, qsc.getMaxCapacityPercent(), 0);
assertEquals(Boolean.valueOf(false),
Boolean.valueOf(qsc.supportsPriorities()));
@@ -149,10 +143,6 @@
props[i] = queues[i].getProperties();
props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(i + 20));
- props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
- String.valueOf(i + 30));
- props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
- String.valueOf(i + 35));
props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
String.valueOf(i + 25));
props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -183,8 +173,6 @@
assertEquals(qName, qsc.getQueueName());
LOG.info("Context for queue " + qName + " is : " + qsc);
assertEquals(i + 20, qsc.getCapacityPercent(), 0);
- assertEquals(i + 30, qsc.getMapTSC().getMaxTaskLimit());
- assertEquals(i + 35, qsc.getReduceTSC().getMaxTaskLimit());
assertEquals(i + 25, qsc.getMaxCapacityPercent(), 0);
assertEquals(Boolean.valueOf(false),
Boolean.valueOf(qsc.supportsPriorities()));
@@ -201,65 +189,111 @@
JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
queues[0].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
setupAndStartSchedulerFramework(2, 2, 2);
FakeJobInProgress job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user");
FakeJobInProgress job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user");
-
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000002_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user");
+
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+//===========================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
taskTrackerManager.killJob(job1.getJobID());
taskTrackerManager.killJob(job2.getJobID());
// change configuration
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
// Re-write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
- taskTrackerManager.getQueueManager().refreshQueues(null,
- scheduler.getQueueRefresher());
+ taskTrackerManager.getQueueManager().refreshQueues(
+ null,
+ scheduler.getQueueRefresher());
job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user");
job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0,
- queues[2].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4,
+ queues[2].getQueueName(), "user");
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000003_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000003_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0004_m_000002_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0004_m_000003_0 on tt2");
}
/**
@@ -336,60 +370,84 @@
JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
queues[0].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
- String.valueOf(100));
+ CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+ String.valueOf(100));
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
setupAndStartSchedulerFramework(1, 2, 2);
FakeJobInProgress job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user1");
FakeJobInProgress job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user2");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user2");
+
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
assertNull(scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt1")));
taskTrackerManager.killJob(job1.getJobID());
taskTrackerManager.killJob(job2.getJobID());
// change configuration
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
- String.valueOf(50));
+ CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+ String.valueOf(50));
// Re-write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
- taskTrackerManager.getQueueManager().refreshQueues(null,
- scheduler.getQueueRefresher());
+ taskTrackerManager.getQueueManager().refreshQueues(
+ null,
+ scheduler.getQueueRefresher());
job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user1");
job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user2");
-
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user2");
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
}
/**
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
/hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/data_join:817878-830225
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-830225
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
/hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-830225
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-830225
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ivy.xml Tue Oct 27 15:43:58 2009
@@ -54,5 +54,21 @@
name="paranamer-ant"
rev="${paranamer.version}"
conf="common->default"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty-util"
+ rev="${jetty-util.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-api-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
</dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Tue Oct 27 15:43:58 2009
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -118,7 +120,12 @@
}
// Print out the normal response
response.setContentType("text/html");
- PrintWriter out = new PrintWriter(response.getOutputStream());
+
+ // Because the client may read arbitrarily slow, and we hold locks while
+ // the servlet outputs, we want to write to our own buffer which we know
+ // won't block.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter out = new PrintWriter(baos);
String hostname = StringUtils.simpleHostname(
jobTracker.getJobTrackerMachine());
out.print("<html><head>");
@@ -132,6 +139,11 @@
showJobs(out, advancedView);
out.print("</body></html>\n");
out.close();
+
+ // Flush our buffer to the real servlet output
+ OutputStream servletOut = response.getOutputStream();
+ baos.writeTo(servletOut);
+ servletOut.close();
}
/**
@@ -202,55 +214,57 @@
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
(advancedView ? "<th>Weight</th>" : ""));
out.print("</tr>\n");
- Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
- synchronized (scheduler) {
- for (JobInProgress job: runningJobs) {
- JobProfile profile = job.getProfile();
- JobInfo info = scheduler.infos.get(job);
- if (info == null) { // Job finished, but let's show 0's for info
- info = new JobInfo(null, null);
- }
- out.print("<tr>\n");
- out.printf("<td>%s</td>\n", DATE_FORMAT.format(
- new Date(job.getStartTime())));
- out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
- profile.getJobID(), profile.getJobID());
- out.printf("<td>%s</td>\n", profile.getUser());
- out.printf("<td>%s</td>\n", profile.getJobName());
- if (JSPUtil.privateActionsAllowed()) {
- out.printf("<td>%s</td>\n", generateSelect(scheduler
- .getPoolManager().getPoolNames(), scheduler.getPoolManager()
- .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
- + profile.getJobID() + (advancedView ? "&advanced" : "")));
- out.printf("<td>%s</td>\n", generateSelect(Arrays
- .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
- "VERY_HIGH" }), job.getPriority().toString(),
- "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
- + (advancedView ? "&advanced" : "")));
- } else {
- out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
- out.printf("<td>%s</td>\n", job.getPriority().toString());
- }
- Pool pool = scheduler.getPoolManager().getPool(job);
- String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
- String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
- out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
- job.finishedMaps(), job.desiredMaps(),
- info.mapSchedulable.getRunningTasks(),
- mapShare);
- if (advancedView) {
- out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
- }
- String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
- String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
- out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
- job.finishedReduces(), job.desiredReduces(),
- info.reduceSchedulable.getRunningTasks(),
- reduceShare);
- if (advancedView) {
- out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+ synchronized (jobTracker) {
+ Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ synchronized (scheduler) {
+ for (JobInProgress job: runningJobs) {
+ JobProfile profile = job.getProfile();
+ JobInfo info = scheduler.infos.get(job);
+ if (info == null) { // Job finished, but let's show 0's for info
+ info = new JobInfo(null, null);
+ }
+ out.print("<tr>\n");
+ out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+ new Date(job.getStartTime())));
+ out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+ profile.getJobID(), profile.getJobID());
+ out.printf("<td>%s</td>\n", profile.getUser());
+ out.printf("<td>%s</td>\n", profile.getJobName());
+ if (JSPUtil.privateActionsAllowed()) {
+ out.printf("<td>%s</td>\n", generateSelect(scheduler
+ .getPoolManager().getPoolNames(), scheduler.getPoolManager()
+ .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
+ + profile.getJobID() + (advancedView ? "&advanced" : "")));
+ out.printf("<td>%s</td>\n", generateSelect(Arrays
+ .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
+ "VERY_HIGH" }), job.getPriority().toString(),
+ "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
+ + (advancedView ? "&advanced" : "")));
+ } else {
+ out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
+ out.printf("<td>%s</td>\n", job.getPriority().toString());
+ }
+ Pool pool = scheduler.getPoolManager().getPool(job);
+ String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedMaps(), job.desiredMaps(),
+ info.mapSchedulable.getRunningTasks(),
+ mapShare);
+ if (advancedView) {
+ out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
+ }
+ String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedReduces(), job.desiredReduces(),
+ info.reduceSchedulable.getRunningTasks(),
+ reduceShare);
+ if (advancedView) {
+ out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+ }
+ out.print("</tr>\n");
}
- out.print("</tr>\n");
}
}
out.print("</table>\n");
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Oct 27 15:43:58 2009
@@ -64,8 +64,8 @@
public FakeJobInProgress(JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager,
- String[][] mapInputLocations) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf, null);
+ String[][] mapInputLocations, JobTracker jt) throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.mapInputLocations = mapInputLocations;
this.startTime = System.currentTimeMillis();
@@ -536,7 +536,7 @@
if (pool != null)
jobConf.set(POOL_PROPERTY, pool);
JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
- mapInputLocations);
+ mapInputLocations, UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
job.startTime = clock.time;
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Oct 27 15:43:58 2009
@@ -200,10 +200,7 @@
}
// scan input dir contents
submitter.refreshFilePool();
- } catch (IOException e) {
- LOG.error("Startup failed", e);
- if (factory != null) factory.abort(); // abort pipeline
- } catch (InterruptedException e) {
+ } catch (Throwable e) {
LOG.error("Startup failed", e);
if (factory != null) factory.abort(); // abort pipeline
} finally {
@@ -214,8 +211,10 @@
if (factory != null) {
// wait for input exhaustion
factory.join();
- if (null != factory.error()) {
- throw factory.error();
+ final Throwable badTraceException = factory.error();
+ if (null != badTraceException) {
+ LOG.error("Error in trace", badTraceException);
+ throw new IOException("Error in trace", badTraceException);
}
// wait for pending tasks to be submitted
submitter.shutdown();
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Tue Oct 27 15:43:58 2009
@@ -530,6 +530,16 @@
}
}
}
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ while (written < outBytes) {
+ final int len = (int) Math.min(outBytes - written, val.getCapacity());
+ fillBytes(val, len);
+ context.write(NullWritable.get(), val);
+ written += len;
+ }
+ }
}
static class GridmixRecordReader
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Tue Oct 27 15:43:58 2009
@@ -112,13 +112,13 @@
public MockJob(Configuration conf) {
this(conf.getInt(MIN_BYTES_IN, 1 << 20),
- conf.getInt(VAR_BYTES_IN, 10 << 20),
+ conf.getInt(VAR_BYTES_IN, 5 << 20),
conf.getInt(MIN_BYTES_OUT, 1 << 20),
- conf.getInt(VAR_BYTES_OUT, 10 << 20),
+ conf.getInt(VAR_BYTES_OUT, 5 << 20),
conf.getInt(MIN_REC_SIZE , 100),
conf.getInt(VAR_REC_SIZE , 1 << 15),
- conf.getInt(MAX_MAPS, 6),
- conf.getInt(MAX_REDS, 4));
+ conf.getInt(MAX_MAPS, 5),
+ conf.getInt(MAX_REDS, 3));
}
public MockJob(int min_bytes_in, int var_bytes_in,
@@ -126,7 +126,7 @@
int min_rec_size, int var_rec_size,
int max_maps, int max_reds) {
final Random r = new Random();
- name = String.format("MOCKJOB%04d", seq.getAndIncrement());
+ name = String.format("MOCKJOB%05d", seq.getAndIncrement());
submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
r.nextInt(10), TimeUnit.SECONDS));
int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Oct 27 15:43:58 2009
@@ -19,6 +19,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -30,12 +33,20 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapreduce.TaskCounter.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -64,9 +75,13 @@
public static void initCluster() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+ conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
+ conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+ conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
dfsCluster = new MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
- mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1);
+ mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
}
@AfterClass
@@ -81,6 +96,7 @@
static class TestMonitor extends JobMonitor {
+ static final long SLOPBYTES = 5 * 1024;
private final int expected;
private final BlockingQueue<Job> retiredJobs;
@@ -90,9 +106,142 @@
retiredJobs = new LinkedBlockingQueue<Job>();
}
- public void verify(ArrayList<JobStory> submitted) {
+ public void verify(ArrayList<JobStory> submitted) throws Exception {
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+ final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+ for (JobStory spec : submitted) {
+ sub.put(spec.getName(), spec);
+ }
+ for (Job job : succeeded) {
+ final String jobname = job.getJobName();
+ if ("GRIDMIX_GENDATA".equals(jobname)) {
+ final Path in = new Path("foo").makeQualified(dfs);
+ final Path out = new Path("/gridmix").makeQualified(dfs);
+ final ContentSummary generated = dfs.getContentSummary(in);
+ assertTrue("Mismatched data gen", // +/- 100k for logs
+ (GENDATA << 20) < generated.getLength() + GENSLOP ||
+ (GENDATA << 20) > generated.getLength() - GENSLOP);
+ FileStatus[] outstat = dfs.listStatus(out);
+ assertEquals("Mismatched job count", NJOBS, outstat.length);
+ continue;
+ }
+ final JobStory spec =
+ sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+ assertNotNull("No spec for " + job.getJobName(), spec);
+ assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+ final int nMaps = spec.getNumberMaps();
+ final int nReds = spec.getNumberReduces();
+
+ System.out.println(jobname + ": " + nMaps + "/" + nReds);
+ final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
+ assertEquals("Mismatched map count", nMaps, mReports.length);
+ check(TaskType.MAP, job, spec, mReports,
+ 0, 1, nReds * SLOPBYTES, nReds + 1);
+
+ final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
+ assertEquals("Mismatched reduce count", nReds, rReports.length);
+ check(TaskType.REDUCE, job, spec, rReports,
+ nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+ }
+ }
+
+ public void check(final TaskType type, Job job, JobStory spec,
+ final TaskReport[] runTasks,
+ long extraInputBytes, int extraInputRecords,
+ long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+ long[] runInputRecords = new long[runTasks.length];
+ long[] runInputBytes = new long[runTasks.length];
+ long[] runOutputRecords = new long[runTasks.length];
+ long[] runOutputBytes = new long[runTasks.length];
+ long[] specInputRecords = new long[runTasks.length];
+ long[] specInputBytes = new long[runTasks.length];
+ long[] specOutputRecords = new long[runTasks.length];
+ long[] specOutputBytes = new long[runTasks.length];
+
+ for (int i = 0; i < runTasks.length; ++i) {
+ final TaskInfo specInfo;
+ final Counters counters = runTasks[i].getTaskCounters();
+ switch (type) {
+ case MAP:
+ runInputBytes[i] = counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ").getValue();
+ runInputRecords[i] =
+ (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+ specInfo = spec.getTaskInfo(TaskType.MAP, i);
+ break;
+ case REDUCE:
+ runInputBytes[i] =
+ counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
+ runInputRecords[i] =
+ (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_WRITTEN").getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+ specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+ break;
+ default:
+ specInfo = null;
+ fail("Unexpected type: " + type);
+ }
+ specInputBytes[i] = specInfo.getInputBytes();
+ specInputRecords[i] = specInfo.getInputRecords();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+ specInputBytes[i], specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
+ runInputBytes[i], runOutputBytes[i],
+ runInputRecords[i], runOutputRecords[i]);
+ }
+
+ // Check input bytes
+ Arrays.sort(specInputBytes);
+ Arrays.sort(runInputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched input bytes " +
+ specInputBytes[i] + "/" + runInputBytes[i],
+ runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+ }
+
+ // Check input records
+ Arrays.sort(specInputRecords);
+ Arrays.sort(runInputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched input records " +
+ specInputRecords[i] + "/" + runInputRecords[i],
+ runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+ }
+
+ // Check output bytes
+ Arrays.sort(specOutputBytes);
+ Arrays.sort(runOutputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched output bytes " +
+ specOutputBytes[i] + "/" + runOutputBytes[i],
+ runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+ }
+
+ // Check output records
+ Arrays.sort(specOutputRecords);
+ Arrays.sort(runOutputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched output records " +
+ specOutputRecords[i] + "/" + runOutputRecords[i],
+ runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+ }
+
}
@Override
@@ -110,7 +259,7 @@
private DebugJobFactory factory;
private TestMonitor monitor;
- public void checkMonitor() {
+ public void checkMonitor() throws Exception {
monitor.verify(factory.getSubmitted());
}
@@ -146,12 +295,6 @@
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
- final ContentSummary generated = dfs.getContentSummary(in);
- assertTrue("Mismatched data gen", // +/- 100k for logs
- (GENDATA << 20) < generated.getLength() + GENSLOP ||
- (GENDATA << 20) > generated.getLength() - GENSLOP);
- FileStatus[] outstat = dfs.listStatus(out);
- assertEquals("Mismatched job count", NJOBS, outstat.length);
}
}
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
/hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/index:817878-830225
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
/hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-830225
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
/hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:817879-818559
+/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-830225
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/build.xml Tue Oct 27 15:43:58 2009
@@ -152,4 +152,12 @@
<fail if="tests.failed">Tests failed!</fail>
</target>
+ <target name="doc">
+ <exec executable="make" failonerror="true">
+ <arg value="-C" />
+ <arg value="${basedir}/doc" />
+ <arg value="BUILDROOT=${build.dir}" />
+ </exec>
+ </target>
+
</project>
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Oct 27 15:43:58 2009
@@ -0,0 +1,3 @@
+Sqoop-manpage.xml
+sqoop.1
+Sqoop-web.html
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ivy.xml Tue Oct 27 15:43:58 2009
@@ -48,6 +48,10 @@
name="commons-httpclient"
rev="${commons-httpclient.version}"
conf="common->default"/>
+ <dependency org="commons-io"
+ name="commons-io"
+ rev="${commons-io.version}"
+ conf="common->default"/>
<dependency org="commons-cli"
name="commons-cli"
rev="${commons-cli.version}"
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Tue Oct 27 15:43:58 2009
@@ -100,6 +100,8 @@
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
private int numMappers;
+ private boolean useCompression;
+ private long directSplitSize; // In direct mode, open a new stream every X bytes.
private char inputFieldDelim;
private char inputRecordDelim;
@@ -136,6 +138,23 @@
this.tableName = table;
}
+ private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+ String str = props.getProperty(propName,
+ Boolean.toString(defaultValue)).toLowerCase();
+ return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+ }
+
+ private long getLongProperty(Properties props, String propName, long defaultValue) {
+ String str = props.getProperty(propName,
+ Long.toString(defaultValue)).toLowerCase();
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Could not parse integer value for config parameter " + propName);
+ return defaultValue;
+ }
+ }
+
private void loadFromProperties() {
File configFile = new File(DEFAULT_CONFIG_FILE);
if (!configFile.canRead()) {
@@ -164,15 +183,11 @@
this.className = props.getProperty("java.classname", this.className);
this.packageName = props.getProperty("java.packagename", this.packageName);
- String directImport = props.getProperty("direct.import",
- Boolean.toString(this.direct)).toLowerCase();
- this.direct = "true".equals(directImport) || "yes".equals(directImport)
- || "1".equals(directImport);
-
- String hiveImportStr = props.getProperty("hive.import",
- Boolean.toString(this.hiveImport)).toLowerCase();
- this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
- || "1".equals(hiveImportStr);
+ this.direct = getBooleanProperty(props, "direct.import", this.direct);
+ this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+ this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+ this.directSplitSize = getLongProperty(props, "direct.split.size",
+ this.directSplitSize);
} catch (IOException ioe) {
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
} finally {
@@ -231,6 +246,8 @@
this.areDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
+ this.useCompression = false;
+ this.directSplitSize = 0;
loadFromProperties();
}
@@ -272,6 +289,9 @@
System.out.println("--hive-import If set, then import the table into Hive.");
System.out.println(" (Uses Hive's default delimiters if none are set.)");
System.out.println("-m, --num-mappers (n) Use 'n' map tasks to import in parallel");
+ System.out.println("-z, --compress Enable compression");
+ System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes");
+ System.out.println(" when importing in direct mode.");
System.out.println("");
System.out.println("Output line formatting options:");
System.out.println("--fields-terminated-by (char) Sets the field separator character");
@@ -437,7 +457,8 @@
this.password = "";
}
} else if (args[i].equals("--password")) {
- LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+ LOG.warn("Setting your password on the command-line is insecure. "
+ + "Consider using -P instead.");
this.password = args[++i];
} else if (args[i].equals("-P")) {
this.password = securePasswordEntry();
@@ -449,12 +470,7 @@
this.hiveImport = true;
} else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
String numMappersStr = args[++i];
- try {
- this.numMappers = Integer.valueOf(numMappersStr);
- } catch (NumberFormatException nfe) {
- throw new InvalidOptionsException("Invalid argument; expected "
- + args[i - 1] + " (number).");
- }
+ this.numMappers = Integer.valueOf(numMappersStr);
} else if (args[i].equals("--fields-terminated-by")) {
this.outputFieldDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
@@ -505,6 +521,10 @@
this.packageName = args[++i];
} else if (args[i].equals("--class-name")) {
this.className = args[++i];
+ } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+ this.useCompression = true;
+ } else if (args[i].equals("--direct-split-size")) {
+ this.directSplitSize = Long.parseLong(args[++i]);
} else if (args[i].equals("--list-databases")) {
this.action = ControlAction.ListDatabases;
} else if (args[i].equals("--generate-only")) {
@@ -529,6 +549,9 @@
} catch (ArrayIndexOutOfBoundsException oob) {
throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
+ "Try --help for usage.");
+ } catch (NumberFormatException nfe) {
+ throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+ + "Try --help for usage.");
}
}
@@ -832,4 +855,18 @@
public boolean isOutputEncloseRequired() {
return this.outputMustBeEnclosed;
}
+
+ /**
+ * @return true if the user wants imported results to be compressed.
+ */
+ public boolean shouldUseCompression() {
+ return this.useCompression;
+ }
+
+ /**
+ * @return the file size to split by when using --direct mode.
+ */
+ public long getDirectSplitSize() {
+ return this.directSplitSize;
+ }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Tue Oct 27 15:43:58 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportError;
@@ -64,11 +65,11 @@
char to each record.
*/
static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final ImportOptions options;
- PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+ PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -94,14 +95,14 @@
private static class PostgresqlStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final InputStream stream;
private final ImportOptions options;
private final PerfCounters counters;
private boolean error;
- PostgresqlStreamThread(final InputStream is, final BufferedWriter w,
+ PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.stream = is;
this.writer = w;
@@ -115,7 +116,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
char recordDelim = this.options.getOutputRecordDelim();
@@ -131,6 +132,7 @@
w.write(inLine);
w.write(recordDelim);
+ w.allowSplit();
counters.addBytes(1 + inLine.length());
}
} catch (IOException ioe) {
@@ -394,8 +396,7 @@
}
// This writer will be closed by StreamHandlerFactory.
- OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
// Actually start the psql dump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Tue Oct 27 15:43:58 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
@@ -64,10 +65,10 @@
* header and footer characters that are attached to each line in mysqldump.
*/
static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final PerfCounters counters;
- CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
@@ -91,13 +92,14 @@
private static class CopyingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
- CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
+ final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
@@ -109,7 +111,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -169,11 +171,11 @@
* output, and re-emit the text in the user's specified output format.
*/
static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
- ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+ ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -199,14 +201,14 @@
private static class ReparsingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
- ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+ ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -234,7 +236,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -292,6 +294,7 @@
}
w.write(outputRecordDelim);
+ w.allowSplit();
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
@@ -444,8 +447,7 @@
}
// This writer will be closed by StreamHandlerFactory.
- OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Tue Oct 27 15:43:58 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
@@ -104,10 +105,16 @@
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormat(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setCompressOutput(job, true);
- SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ if (options.shouldUseCompression()) {
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ }
job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java Tue Oct 27 15:43:58 2009
@@ -68,6 +68,7 @@
private class ProgressThread extends Thread {
private volatile boolean keepGoing; // while this is true, thread runs.
+
private Context context;
private long startTimeMillis;
private long lastReportMillis;
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java Tue Oct 27 15:43:58 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -104,11 +105,17 @@
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(AutoProgressMapper.class);
- SequenceFileOutputFormat.setCompressOutput(job, true);
- SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ if (options.shouldUseCompression()) {
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ }
job.getConfiguration().set("mapred.output.value.class", tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Tue Oct 27 15:43:58 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.File;
-import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittingOutputStream;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.util.Shell;
/**
@@ -64,8 +65,8 @@
* The caller is responsible for calling the close() method on the returned
* stream.
*/
- public static OutputStream createHdfsSink(Configuration conf, ImportOptions options,
- String tableName) throws IOException {
+ public static SplittableBufferedWriter createHdfsSink(Configuration conf,
+ ImportOptions options, String tableName) throws IOException {
FileSystem fs = FileSystem.get(conf);
String warehouseDir = options.getWarehouseDir();
@@ -79,15 +80,11 @@
LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
LOG.debug("Creating destination directory " + destDir);
fs.mkdirs(destDir);
- Path destFile = new Path(destDir, "data-00000");
- LOG.debug("Opening output file: " + destFile);
- if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
- throw new IOException("Destination file " + canonicalDest + " already exists");
- }
- // This OutputStream will be clsoed by the caller.
- return fs.create(destFile);
+ // This Writer will be closed by the caller.
+ return new SplittableBufferedWriter(
+ new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
+ options.shouldUseCompression()));
}
}