You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/01/31 22:39:44 UTC

svn commit: r1441206 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Author: szetszwo
Date: Thu Jan 31 21:39:42 2013
New Revision: 1441206

URL: http://svn.apache.org/viewvc?rev=1441206&view=rev
Log:
Merge r1440222 through r1441205 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1440222-1441205

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1441206&r1=1441205&r2=1441206&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu Jan 31 21:39:42 2013
@@ -217,6 +217,9 @@ Release 2.0.3-alpha - Unreleased 
 
   OPTIMIZATIONS
 
+    MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of
+    containers to get maximum locality. (Bikas Saha via vinodkv)
+
   BUG FIXES
 
     MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task
@@ -278,6 +281,9 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-2264. Job status exceeds 100% in some cases. 
     (devaraj.k and sandyr via tucu)
 
+    MAPREDUCE-4969. TestKeyValueTextInputFormat test fails with Open JDK 7.
+    (Arpit Agarwal via suresh)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1440222-1441205

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1440222-1441205

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1441206&r1=1441205&r2=1441206&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jan 31 21:39:42 2013
@@ -747,7 +747,7 @@ public class RMContainerAllocator extend
       addContainerReq(req);
     }
     
-    @SuppressWarnings("unchecked")
+    // this method will change the list of allocatedContainers.
     private void assign(List<Container> allocatedContainers) {
       Iterator<Container> it = allocatedContainers.iterator();
       LOG.info("Got allocated containers " + allocatedContainers.size());
@@ -788,84 +788,97 @@ public class RMContainerAllocator extend
                 + reduces.isEmpty()); 
             isAssignable = false;
           }
-        }          
+        } else {
+          LOG.warn("Container allocated at unwanted priority: " + priority + 
+              ". Returning to RM...");
+          isAssignable = false;
+        }
         
-        boolean blackListed = false;         
-        ContainerRequest assigned = null;
+        if(!isAssignable) {
+          // release container if we could not assign it 
+          containerNotAssigned(allocated);
+          it.remove();
+          continue;
+        }
         
-        if (isAssignable) {
-          // do not assign if allocated container is on a  
-          // blacklisted host
-          String allocatedHost = allocated.getNodeId().getHost();
-          blackListed = isNodeBlacklisted(allocatedHost);
-          if (blackListed) {
-            // we need to request for a new container 
-            // and release the current one
-            LOG.info("Got allocated container on a blacklisted "
-                + " host "+allocatedHost
-                +". Releasing container " + allocated);
-
-            // find the request matching this allocated container 
-            // and replace it with a new one 
-            ContainerRequest toBeReplacedReq = 
-                getContainerReqToReplace(allocated);
-            if (toBeReplacedReq != null) {
-              LOG.info("Placing a new container request for task attempt " 
-                  + toBeReplacedReq.attemptID);
-              ContainerRequest newReq = 
-                  getFilteredContainerRequest(toBeReplacedReq);
-              decContainerReq(toBeReplacedReq);
-              if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
-                  TaskType.MAP) {
-                maps.put(newReq.attemptID, newReq);
-              }
-              else {
-                reduces.put(newReq.attemptID, newReq);
-              }
-              addContainerReq(newReq);
+        // do not assign if allocated container is on a  
+        // blacklisted host
+        String allocatedHost = allocated.getNodeId().getHost();
+        if (isNodeBlacklisted(allocatedHost)) {
+          // we need to request for a new container 
+          // and release the current one
+          LOG.info("Got allocated container on a blacklisted "
+              + " host "+allocatedHost
+              +". Releasing container " + allocated);
+
+          // find the request matching this allocated container 
+          // and replace it with a new one 
+          ContainerRequest toBeReplacedReq = 
+              getContainerReqToReplace(allocated);
+          if (toBeReplacedReq != null) {
+            LOG.info("Placing a new container request for task attempt " 
+                + toBeReplacedReq.attemptID);
+            ContainerRequest newReq = 
+                getFilteredContainerRequest(toBeReplacedReq);
+            decContainerReq(toBeReplacedReq);
+            if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+                TaskType.MAP) {
+              maps.put(newReq.attemptID, newReq);
             }
             else {
-              LOG.info("Could not map allocated container to a valid request."
-                  + " Releasing allocated container " + allocated);
+              reduces.put(newReq.attemptID, newReq);
             }
+            addContainerReq(newReq);
           }
           else {
-            assigned = assign(allocated);
-            if (assigned != null) {
-              // Update resource requests
-              decContainerReq(assigned);
-
-              // send the container-assigned event to task attempt
-              eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-                  assigned.attemptID, allocated, applicationACLs));
-
-              assignedRequests.add(allocated, assigned.attemptID);
-
-              if (LOG.isDebugEnabled()) {
-                LOG.info("Assigned container (" + allocated + ") "
-                    + " to task " + assigned.attemptID + " on node "
-                    + allocated.getNodeId().toString());
-              }
-            }
-            else {
-              //not assigned to any request, release the container
-              LOG.info("Releasing unassigned and invalid container " 
-                  + allocated + ". RM has gone crazy, someone go look!"
-                  + " Hey RM, if you are so rich, go donate to non-profits!");
-            }
+            LOG.info("Could not map allocated container to a valid request."
+                + " Releasing allocated container " + allocated);
           }
+          
+          // release container if we could not assign it 
+          containerNotAssigned(allocated);
+          it.remove();
+          continue;
         }
-        
-        // release container if it was blacklisted 
-        // or if we could not assign it 
-        if (blackListed || assigned == null) {
-          containersReleased++;
-          release(allocated.getId());
-        }
+      }
+
+      assignContainers(allocatedContainers);
+       
+      // release container if we could not assign it 
+      it = allocatedContainers.iterator();
+      while (it.hasNext()) {
+        Container allocated = it.next();
+        LOG.info("Releasing unassigned and invalid container " 
+            + allocated + ". RM may have assignment issues");
+        containerNotAssigned(allocated);
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void containerAssigned(Container allocated, 
+                                    ContainerRequest assigned) {
+      // Update resource requests
+      decContainerReq(assigned);
+
+      // send the container-assigned event to task attempt
+      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+          assigned.attemptID, allocated, applicationACLs));
+
+      assignedRequests.add(allocated, assigned.attemptID);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Assigned container (" + allocated + ") "
+            + " to task " + assigned.attemptID + " on node "
+            + allocated.getNodeId().toString());
       }
     }
     
-    private ContainerRequest assign(Container allocated) {
+    private void containerNotAssigned(Container allocated) {
+      containersReleased++;
+      release(allocated.getId());      
+    }
+    
+    private ContainerRequest assignWithoutLocality(Container allocated) {
       ContainerRequest assigned = null;
       
       Priority priority = allocated.getPriority();
@@ -877,18 +890,24 @@ public class RMContainerAllocator extend
           LOG.debug("Assigning container " + allocated + " to reduce");
         }
         assigned = assignToReduce(allocated);
-      } else if (PRIORITY_MAP.equals(priority)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Assigning container " + allocated + " to map");
-        }
-        assigned = assignToMap(allocated);
-      } else {
-        LOG.warn("Container allocated at unwanted priority: " + priority + 
-            ". Returning to RM...");
       }
         
       return assigned;
     }
+        
+    private void assignContainers(List<Container> allocatedContainers) {
+      Iterator<Container> it = allocatedContainers.iterator();
+      while (it.hasNext()) {
+        Container allocated = it.next();
+        ContainerRequest assigned = assignWithoutLocality(allocated);
+        if (assigned != null) {
+          containerAssigned(allocated, assigned);
+          it.remove();
+        }
+      }
+
+      assignMapsWithLocality(allocatedContainers);
+    }
     
     private ContainerRequest getContainerReqToReplace(Container allocated) {
       LOG.info("Finding containerReq for allocated container: " + allocated);
@@ -959,11 +978,15 @@ public class RMContainerAllocator extend
     }
     
     @SuppressWarnings("unchecked")
-    private ContainerRequest assignToMap(Container allocated) {
-    //try to assign to maps if present 
-      //first by host, then by rack, followed by *
-      ContainerRequest assigned = null;
-      while (assigned == null && maps.size() > 0) {
+    private void assignMapsWithLocality(List<Container> allocatedContainers) {
+      // try to assign to all nodes first to match node local
+      Iterator<Container> it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();        
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        // "if (maps.containsKey(tId))" below should be almost always true.
+        // hence this while loop would almost always have O(1) complexity
         String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
@@ -972,7 +995,9 @@ public class RMContainerAllocator extend
           }
           TaskAttemptId tId = list.removeFirst();
           if (maps.containsKey(tId)) {
-            assigned = maps.remove(tId);
+            ContainerRequest assigned = maps.remove(tId);
+            containerAssigned(allocated, assigned);
+            it.remove();
             JobCounterUpdateEvent jce =
               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
             jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
@@ -984,39 +1009,56 @@ public class RMContainerAllocator extend
             break;
           }
         }
-        if (assigned == null) {
-          String rack = RackResolver.resolve(host).getNetworkLocation();
-          list = mapsRackMapping.get(rack);
-          while (list != null && list.size() > 0) {
-            TaskAttemptId tId = list.removeFirst();
-            if (maps.containsKey(tId)) {
-              assigned = maps.remove(tId);
-              JobCounterUpdateEvent jce =
-                new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-              jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
-              eventHandler.handle(jce);
-              rackLocalAssigned++;
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Assigned based on rack match " + rack);
-              }
-              break;
-            }
-          }
-          if (assigned == null && maps.size() > 0) {
-            TaskAttemptId tId = maps.keySet().iterator().next();
-            assigned = maps.remove(tId);
+      }
+      
+      // try to match all rack local
+      it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        // "if (maps.containsKey(tId))" below should be almost always true.
+        // hence this while loop would almost always have O(1) complexity
+        String host = allocated.getNodeId().getHost();
+        String rack = RackResolver.resolve(host).getNetworkLocation();
+        LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+        while (list != null && list.size() > 0) {
+          TaskAttemptId tId = list.removeFirst();
+          if (maps.containsKey(tId)) {
+            ContainerRequest assigned = maps.remove(tId);
+            containerAssigned(allocated, assigned);
+            it.remove();
             JobCounterUpdateEvent jce =
               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+            jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
             eventHandler.handle(jce);
+            rackLocalAssigned++;
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Assigned based on * match");
+              LOG.debug("Assigned based on rack match " + rack);
             }
             break;
           }
         }
       }
-      return assigned;
+      
+      // assign remaining
+      it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        TaskAttemptId tId = maps.keySet().iterator().next();
+        ContainerRequest assigned = maps.remove(tId);
+        containerAssigned(allocated, assigned);
+        it.remove();
+        JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+        eventHandler.handle(jce);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Assigned based on * match");
+        }
+      }
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1441206&r1=1441205&r2=1441206&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Jan 31 21:39:42 2013
@@ -190,6 +190,92 @@ public class TestRMContainerAllocator {
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
   }
+  
+  @Test 
+  public void testMapNodeLocality() throws Exception {
+    // test checks that ordering of allocated containers list from the RM does 
+    // not affect the map->container assignment done by the AM. If there is a 
+    // node local container available for a map then it should be assigned to 
+    // that container and not a rack-local container that happened to be seen 
+    // earlier in the allocated containers list from the RM.
+    // Regression test for MAPREDUCE-4893
+    LOG.info("Running testMapNodeLocality");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps 
+    rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
+    dispatcher.await();
+
+    // create the container requests for maps
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h2" });
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    // Node heartbeat from rack-local first. This makes node h3 the first in the
+    // list of allocated containers but it should not be assigned to task1.
+    nodeManager3.nodeHeartbeat(true);
+    // Node heartbeat from node-local next. This allocates 2 node local 
+    // containers for task1 and task2. These should be matched with those tasks.
+    nodeManager1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+        assigned, false);
+    // remove the rack-local assignment that should have happened for task3
+    for(TaskAttemptContainerAssignedEvent event : assigned) {
+      if(event.getTaskAttemptID().equals(event3.getAttemptID())) {
+        assigned.remove(event);
+        Assert.assertTrue(
+                    event.getContainer().getNodeId().getHost().equals("h3"));
+        break;
+      }
+    }
+    checkAssignments(new ContainerRequestEvent[] { event1, event2},
+        assigned, true);
+  }
 
   @Test
   public void testResource() throws Exception {
@@ -1202,7 +1288,7 @@ public class TestRMContainerAllocator {
     if (checkHostMatch) {
       Assert.assertTrue("Not assigned to requested host", Arrays.asList(
           request.getHosts()).contains(
-          assigned.getContainer().getNodeId().toString()));
+          assigned.getContainer().getNodeId().getHost()));
     }
   }
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1440222-1441205

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=1441206&r1=1441205&r2=1441206&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Thu Jan 31 21:39:42 2013
@@ -136,32 +136,47 @@ public class TestKeyValueTextInputFormat
   }
   
   public void testUTF8() throws Exception {
-    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
-    Text line = new Text();
-    in.readLine(line);
-    assertEquals("readLine changed utf8 characters", 
-                 "abcd\u20acbdcd\u20ac", line.toString());
-    in = makeStream("abc\u200axyz");
-    in.readLine(line);
-    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+    LineReader in = null;
+
+    try {
+      in = makeStream("abcd\u20acbdcd\u20ac");
+      Text line = new Text();
+      in.readLine(line);
+      assertEquals("readLine changed utf8 characters",
+                   "abcd\u20acbdcd\u20ac", line.toString());
+      in = makeStream("abc\u200axyz");
+      in.readLine(line);
+      assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
   }
 
   public void testNewLines() throws Exception {
-    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
-    Text out = new Text();
-    in.readLine(out);
-    assertEquals("line1 length", 1, out.getLength());
-    in.readLine(out);
-    assertEquals("line2 length", 2, out.getLength());
-    in.readLine(out);
-    assertEquals("line3 length", 0, out.getLength());
-    in.readLine(out);
-    assertEquals("line4 length", 3, out.getLength());
-    in.readLine(out);
-    assertEquals("line5 length", 4, out.getLength());
-    in.readLine(out);
-    assertEquals("line5 length", 5, out.getLength());
-    assertEquals("end of file", 0, in.readLine(out));
+    LineReader in = null;
+    try {
+      in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+      Text out = new Text();
+      in.readLine(out);
+      assertEquals("line1 length", 1, out.getLength());
+      in.readLine(out);
+      assertEquals("line2 length", 2, out.getLength());
+      in.readLine(out);
+      assertEquals("line3 length", 0, out.getLength());
+      in.readLine(out);
+      assertEquals("line4 length", 3, out.getLength());
+      in.readLine(out);
+      assertEquals("line5 length", 4, out.getLength());
+      in.readLine(out);
+      assertEquals("line5 length", 5, out.getLength());
+      assertEquals("end of file", 0, in.readLine(out));
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
   }
   
   private static void writeFile(FileSystem fs, Path name, 
@@ -183,14 +198,21 @@ public class TestKeyValueTextInputFormat
                                       InputSplit split, 
                                       JobConf job) throws IOException {
     List<Text> result = new ArrayList<Text>();
-    RecordReader<Text, Text> reader = format.getRecordReader(split, job,
-                                                 voidReporter);
-    Text key = reader.createKey();
-    Text value = reader.createValue();
-    while (reader.next(key, value)) {
-      result.add(value);
-      value = reader.createValue();
-    }
+    RecordReader<Text, Text> reader = null;
+
+    try {
+      reader = format.getRecordReader(split, job, voidReporter);
+      Text key = reader.createKey();
+      Text value = reader.createValue();
+      while (reader.next(key, value)) {
+        result.add(value);
+        value = (Text) reader.createValue();
+      }   
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }   
+    }   
     return result;
   }