You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/21 01:25:45 UTC
svn commit: r477433 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/Server.java
Author: cutting
Date: Mon Nov 20 16:25:44 2006
New Revision: 477433
URL: http://svn.apache.org/viewvc?view=rev&rev=477433
Log:
HADOOP-677. In IPC, permit a version header to be transmitted when connections are established. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Nov 20 16:25:44 2006
@@ -106,6 +106,11 @@
32. HADOOP-709. Fix contrib/streaming to work with commands that
contain control characters. (Dhruba Borthakur via cutting)
+33. HADOOP-677. In IPC, permit a version header to be transmitted
+ when connections are established. This will permit us to change
+ the format of IPC requests back-compatibly in subsequent releases.
+ (omalley via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Nov 20 16:25:44 2006
@@ -31,6 +31,7 @@
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
+import java.io.OutputStream;
import java.util.Hashtable;
import java.util.Iterator;
@@ -43,6 +44,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.StringUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -52,6 +54,10 @@
* @see Server
*/
public class Client {
+ /** Should the client send the header on the connection? */
+ private static final boolean SEND_HEADER = false;
+ private static final byte CURRENT_VERSION = 0;
+
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.Client");
private Hashtable connections = new Hashtable();
@@ -155,7 +161,6 @@
}
}
}
-
socket.setSoTimeout(timeout);
this.in = new DataInputStream
(new BufferedInputStream
@@ -178,6 +183,10 @@
}
}
}));
+ if (SEND_HEADER) {
+ out.write(Server.HEADER.array());
+ out.write(CURRENT_VERSION);
+ }
notify();
}
@@ -269,7 +278,7 @@
} catch (EOFException eof) {
// This is what happens when the remote side goes down
} catch (Exception e) {
- LOG.info(getName() + " caught: " + e, e);
+ LOG.info(StringUtils.stringifyException(e));
} finally {
//If there was no exception thrown in this method, then the only
//way we reached here is by breaking out of the while loop (after
@@ -480,7 +489,8 @@
Connection connection = getConnection(addresses[i]);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
- LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
+ LOG.info("Calling "+addresses[i]+" caught: " +
+ StringUtils.stringifyException(e)); // log errors
results.size--; // wait for one fewer result
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Nov 20 16:25:44 2006
@@ -59,6 +59,12 @@
* @see Client
*/
public abstract class Server {
+
+ /**
+ * The first four bytes of Hadoop RPC connections
+ */
+ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+
/**
* How much time should be allocated for actually running the handler?
* Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
@@ -346,6 +352,7 @@
/** Reads calls from a connection and queues them for handling. */
private class Connection {
+ private boolean firstData = true;
private SocketChannel channel;
private SelectionKey key;
private ByteBuffer data;
@@ -415,6 +422,23 @@
if ( count < 0 || dataLengthBuffer.remaining() > 0 )
return count;
dataLengthBuffer.flip();
+ // Is this a new style header?
+ if (firstData && HEADER.equals(dataLengthBuffer)) {
+ // If so, read the version
+ ByteBuffer versionBuffer = ByteBuffer.allocate(1);
+ count = channel.read(versionBuffer);
+ if (count < 0) {
+ return count;
+ }
+ // read the first length
+ dataLengthBuffer.clear();
+ count = channel.read(dataLengthBuffer);
+ if (count < 0 || dataLengthBuffer.remaining() > 0) {
+ return count;
+ }
+ dataLengthBuffer.flip();
+ firstData = false;
+ }
dataLength = dataLengthBuffer.getInt();
data = ByteBuffer.allocate(dataLength);
}