You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/21 01:25:45 UTC

svn commit: r477433 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java

Author: cutting
Date: Mon Nov 20 16:25:44 2006
New Revision: 477433

URL: http://svn.apache.org/viewvc?view=rev&rev=477433
Log:
HADOOP-677.  In IPC, permit a version header to be transmitted when connections are established.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Nov 20 16:25:44 2006
@@ -106,6 +106,11 @@
 32. HADOOP-709.  Fix contrib/streaming to work with commands that
     contain control characters.  (Dhruba Borthakur via cutting)
 
+33. HADOOP-677.  In IPC, permit a version header to be transmitted
+    when connections are established.  This will permit us to change
+    the format of IPC requests back-compatibly in subsequent releases.
+    (omalley via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Nov 20 16:25:44 2006
@@ -31,6 +31,7 @@
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
+import java.io.OutputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.StringUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -52,6 +54,10 @@
  * @see Server
  */
 public class Client {
+  /** Should the client send the header on the connection? */
+  private static final boolean SEND_HEADER = false;
+  private static final byte CURRENT_VERSION = 0;
+  
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Client");
   private Hashtable connections = new Hashtable();
@@ -155,7 +161,6 @@
           }
         }
       }
-
       socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
@@ -178,6 +183,10 @@
                }
              }
            }));
+      if (SEND_HEADER) {
+        out.write(Server.HEADER.array());
+        out.write(CURRENT_VERSION);
+      }
       notify();
     }
 
@@ -269,7 +278,7 @@
       } catch (EOFException eof) {
           // This is what happens when the remote side goes down
       } catch (Exception e) {
-        LOG.info(getName() + " caught: " + e, e);
+        LOG.info(StringUtils.stringifyException(e));
       } finally {
         //If there was no exception thrown in this method, then the only
         //way we reached here is by breaking out of the while loop (after
@@ -480,7 +489,8 @@
           Connection connection = getConnection(addresses[i]);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
-          LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
+          LOG.info("Calling "+addresses[i]+" caught: " + 
+                   StringUtils.stringifyException(e)); // log errors
           results.size--;                         //  wait for one fewer result
         }
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=477433&r1=477432&r2=477433
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Nov 20 16:25:44 2006
@@ -59,6 +59,12 @@
  * @see Client
  */
 public abstract class Server {
+  
+  /**
+   * The first four bytes of Hadoop RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  
   /**
    * How much time should be allocated for actually running the handler?
    * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
@@ -346,6 +352,7 @@
 
   /** Reads calls from a connection and queues them for handling. */
   private class Connection {
+    private boolean firstData = true;
     private SocketChannel channel;
     private SelectionKey key;
     private ByteBuffer data;
@@ -415,6 +422,23 @@
         if ( count < 0 || dataLengthBuffer.remaining() > 0 ) 
           return count;        
         dataLengthBuffer.flip(); 
+        // Is this a new style header?
+        if (firstData && HEADER.equals(dataLengthBuffer)) {
+          // If so, read the version
+          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
+          count = channel.read(versionBuffer);
+          if (count < 0) {
+            return count;
+          }
+          // read the first length
+          dataLengthBuffer.clear();
+          count = channel.read(dataLengthBuffer);
+          if (count < 0 || dataLengthBuffer.remaining() > 0) {
+            return count;
+          }
+          dataLengthBuffer.flip();
+          firstData = false;
+        }
         dataLength = dataLengthBuffer.getInt();
         data = ByteBuffer.allocate(dataLength);
       }