You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/10/18 17:23:12 UTC
svn commit: r586003 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/pipes/
Author: acmurthy
Date: Thu Oct 18 08:23:11 2007
New Revision: 586003
URL: http://svn.apache.org/viewvc?rev=586003&view=rev
Log:
HADOOP-2070. Added a flush method to pipes' DownwardProtocol and call that before waiting for the application to finish to ensure all buffered data is flushed. Contributed by Owen O'Malley.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 18 08:23:11 2007
@@ -325,6 +325,10 @@
very uneven splits for applications like distcp that count on them.
(omalley)
+ HADOOP-2070. Added a flush method to pipes' DownwardProtocol and call
+ that before waiting for the application to finish to ensure all buffered
+ data is flushed. (Owen O'Malley via acmurthy)
+
IMPROVEMENTS
HADOOP-1908. Restructure data node code so that block sending and
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Oct 18 08:23:11 2007
@@ -51,7 +51,7 @@
private Process process;
private Socket clientSocket;
private OutputHandler<K2, V2> handler;
- private BinaryProtocol<K1, V1, K2, V2> downlink;
+ private DownwardProtocol<K1, V1> downlink;
/**
* Start the child process to handle the task for us.
@@ -109,6 +109,7 @@
* @throws Throwable
*/
boolean waitForFinish() throws Throwable {
+ downlink.flush();
return handler.waitForFinish();
}
@@ -121,6 +122,7 @@
LOG.info("Aborting because of " + StringUtils.stringifyException(t));
try {
downlink.abort();
+ downlink.flush();
} catch (IOException e) {
// IGNORE cleanup problems
}
@@ -141,7 +143,7 @@
void cleanup() throws IOException {
serverSocket.close();
try {
- downlink.closeConnection();
+ downlink.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Oct 18 08:23:11 2007
@@ -226,7 +226,7 @@
* @throws IOException
* @throws InterruptedException
*/
- public void closeConnection() throws IOException, InterruptedException {
+ public void close() throws IOException, InterruptedException {
LOG.debug("closing connection");
stream.close();
uplink.closeConnection();
@@ -291,15 +291,18 @@
writeObject(value);
}
- public void close() throws IOException {
+ public void endOfInput() throws IOException {
WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
LOG.debug("Sent close command");
- stream.flush();
}
public void abort() throws IOException {
WritableUtils.writeVInt(stream, MessageType.ABORT.code);
LOG.debug("Sent abort command");
+ }
+
+ public void flush() throws IOException {
+ stream.flush();
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Oct 18 08:23:11 2007
@@ -97,11 +97,21 @@
* input.
* @throws IOException
*/
- void close() throws IOException;
+ void endOfInput() throws IOException;
/**
* The task should stop as soon as possible, because something has gone wrong.
* @throws IOException
*/
void abort() throws IOException;
+
+ /**
+ * Flush the data through any buffers.
+ */
+ void flush() throws IOException;
+
+ /**
+ * Close the connection.
+ */
+ void close() throws IOException, InterruptedException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Thu Oct 18 08:23:11 2007
@@ -76,7 +76,7 @@
// map pair to output
downlink.mapItem(key, value);
}
- downlink.close();
+ downlink.endOfInput();
}
application.waitForFinish();
} catch (Throwable t) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=586003&r1=586002&r2=586003&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Thu Oct 18 08:23:11 2007
@@ -94,8 +94,9 @@
}
try {
if (isOk) {
- application.getDownlink().close();
+ application.getDownlink().endOfInput();
} else {
+ // send the abort to the application and let it clean up
application.getDownlink().abort();
}
LOG.info("waiting for finish");