You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/10/18 17:23:12 UTC

svn commit: r586003 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/pipes/

Author: acmurthy
Date: Thu Oct 18 08:23:11 2007
New Revision: 586003

URL: http://svn.apache.org/viewvc?rev=586003&view=rev
Log:
HADOOP-2070.  Added a flush method to pipes' DownwardProtocol and call that before waiting for the application to finish to ensure all buffered data is flushed. Contributed by Owen O'Malley.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 18 08:23:11 2007
@@ -325,6 +325,10 @@
     very uneven splits for applications like distcp that count on them.
     (omalley)
 
+    HADOOP-2070.  Added a flush method to pipes' DownwardProtocol and call
+    that before waiting for the application to finish to ensure all buffered
+    data is flushed. (Owen O'Malley via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Oct 18 08:23:11 2007
@@ -51,7 +51,7 @@
   private Process process;
   private Socket clientSocket;
   private OutputHandler<K2, V2> handler;
-  private BinaryProtocol<K1, V1, K2, V2> downlink;
+  private DownwardProtocol<K1, V1> downlink;
 
   /**
    * Start the child process to handle the task for us.
@@ -109,6 +109,7 @@
    * @throws Throwable
    */
   boolean waitForFinish() throws Throwable {
+    downlink.flush();
     return handler.waitForFinish();
   }
 
@@ -121,6 +122,7 @@
     LOG.info("Aborting because of " + StringUtils.stringifyException(t));
     try {
       downlink.abort();
+      downlink.flush();
     } catch (IOException e) {
       // IGNORE cleanup problems
     }
@@ -141,7 +143,7 @@
   void cleanup() throws IOException {
     serverSocket.close();
     try {
-      downlink.closeConnection();
+      downlink.close();
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
     }      

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Oct 18 08:23:11 2007
@@ -226,7 +226,7 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public void closeConnection() throws IOException, InterruptedException {
+  public void close() throws IOException, InterruptedException {
     LOG.debug("closing connection");
     stream.close();
     uplink.closeConnection();
@@ -291,15 +291,18 @@
     writeObject(value);
   }
 
-  public void close() throws IOException {
+  public void endOfInput() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
-    stream.flush();
   }
   
   public void abort() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.ABORT.code);
     LOG.debug("Sent abort command");
+  }
+
+  public void flush() throws IOException {
+    stream.flush();
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Oct 18 08:23:11 2007
@@ -97,11 +97,21 @@
    * input.
    * @throws IOException
    */
-  void close() throws IOException;
+  void endOfInput() throws IOException;
   
   /**
    * The task should stop as soon as possible, because something has gone wrong.
    * @throws IOException
    */
   void abort() throws IOException;
+  
+  /**
+   * Flush the data through any buffers.
+   */
+  void flush() throws IOException;
+  
+  /**
+   * Close the connection.
+   */
+  void close() throws IOException, InterruptedException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Thu Oct 18 08:23:11 2007
@@ -76,7 +76,7 @@
           // map pair to output
           downlink.mapItem(key, value);
         }
-        downlink.close();
+        downlink.endOfInput();
       }
       application.waitForFinish();
     } catch (Throwable t) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Thu Oct 18 08:23:11 2007
@@ -94,8 +94,9 @@
     }
     try {
       if (isOk) {
-        application.getDownlink().close();
+        application.getDownlink().endOfInput();
       } else {
+        // send the abort to the application and let it clean up
         application.getDownlink().abort();
       }
       LOG.info("waiting for finish");