You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2010/02/18 19:27:52 UTC

svn commit: r911510 - in /incubator/thrift/trunk/lib/java: src/org/apache/thrift/ src/org/apache/thrift/protocol/ src/org/apache/thrift/transport/ test/org/apache/thrift/test/

Author: bryanduxbury
Date: Thu Feb 18 18:27:51 2010
New Revision: 911510

URL: http://svn.apache.org/viewvc?rev=911510&view=rev
Log:
THRIFT-685. java: Direct buffer access to improve deserialization performance

This initial patch adds direct buffer access support to TDeserializer and TCompactProtocol, with the framework in place to be extended to other areas.

Added:
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
Modified:
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/TDeserializer.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TProtocol.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TTransport.java
    incubator/thrift/trunk/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/TDeserializer.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/TDeserializer.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/TDeserializer.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/TDeserializer.java Thu Feb 18 18:27:51 2010
@@ -19,7 +19,6 @@
 
 package org.apache.thrift;
 
-import java.io.ByteArrayInputStream;
 import java.io.UnsupportedEncodingException;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -28,8 +27,7 @@
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.protocol.TProtocolUtil;
 import org.apache.thrift.protocol.TType;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.transport.TMemoryInputTransport;
 
 /**
  * Generic utility for easily deserializing objects from a byte array or Java
@@ -37,7 +35,8 @@
  *
  */
 public class TDeserializer {
-  private final TProtocolFactory protocolFactory_;
+  private final TProtocol protocol_;
+  private final TMemoryInputTransport trans_;
 
   /**
    * Create a new TDeserializer that uses the TBinaryProtocol by default.
@@ -53,7 +52,8 @@
    * @param protocolFactory Factory to create a protocol
    */
   public TDeserializer(TProtocolFactory protocolFactory) {
-    protocolFactory_ = protocolFactory;
+    trans_ = new TMemoryInputTransport(null);
+    protocol_ = protocolFactory.getProtocol(trans_);
   }
 
   /**
@@ -63,10 +63,8 @@
    * @param bytes The array to read from
    */
   public void deserialize(TBase base, byte[] bytes) throws TException {
-    base.read(
-        protocolFactory_.getProtocol(
-          new TIOStreamTransport(
-            new ByteArrayInputStream(bytes))));
+    trans_.reset(bytes);
+    base.read(protocol_);
   }
 
   /**
@@ -103,17 +101,15 @@
       return;
     }
 
-    TProtocol iprot = protocolFactory_.getProtocol(
-        new TIOStreamTransport(
-          new ByteArrayInputStream(bytes))); 
+    trans_.reset(bytes);
 
     // index into field ID path being currently searched for
     int curPathIndex = 0;
 
-    iprot.readStructBegin();
+    protocol_.readStructBegin();
 
     while (curPathIndex < fieldIdPath.length) {
-      TField field = iprot.readFieldBegin();
+      TField field = protocol_.readFieldBegin();
       // we can stop searching if we either see a stop or we go past the field 
       // id we're looking for (since fields should now be serialized in asc
       // order).
@@ -123,19 +119,19 @@
 
       if (field.id != fieldIdPath[curPathIndex].getThriftFieldId()) {
         // Not the field we're looking for. Skip field.
-        TProtocolUtil.skip(iprot, field.type);
-        iprot.readFieldEnd();
+        TProtocolUtil.skip(protocol_, field.type);
+        protocol_.readFieldEnd();
       } else {
         // This field is the next step in the path. Step into field.
         curPathIndex++;
         if (curPathIndex < fieldIdPath.length) {
-          iprot.readStructBegin();
+          protocol_.readStructBegin();
         }
       }
     }
 
     // when this line is reached, iprot will be positioned at the start of tb.
-    tb.read(iprot);
+    tb.read(protocol_);
   }
 
   /**

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java Thu Feb 18 18:27:51 2010
@@ -467,12 +467,12 @@
    */
   public TField readFieldBegin() throws TException {
     byte type = readByte();
-    
+
     // if it's a stop, then we can return immediately, as the struct is over.
-    if ((type & 0x0f) == TType.STOP) {
+    if (type == TType.STOP) {
       return TSTOP;
     }
-    
+
     short fieldId;
 
     // mask off the 4 MSB of the type header. it could contain a field id delta.
@@ -484,7 +484,7 @@
       // has a delta. add the delta to the last read field id.
       fieldId = (short)(lastFieldId_ + modifier);
     }
-    
+
     TField field = new TField("", getTType((byte)(type & 0x0f)), fieldId);
 
     // if this happens to be a boolean field, the value is encoded in the type
@@ -554,8 +554,15 @@
    * Read a single byte off the wire. Nothing interesting here.
    */
   public byte readByte() throws TException {
-    trans_.readAll(byteRawBuf, 0, 1);
-    return byteRawBuf[0];
+    byte b;
+    if (trans_.getBytesRemainingInBuffer() > 0) {
+      b = trans_.getBuffer()[trans_.getBufferPosition()];
+      trans_.consumeBuffer(1);
+    } else {
+      trans_.readAll(byteRawBuf, 0, 1);
+      b = byteRawBuf[0];
+    }
+    return b;
   }
 
   /**
@@ -592,8 +599,20 @@
    * Reads a byte[] (via readBinary), and then UTF-8 decodes it.
    */
   public String readString() throws TException {
+    int length = readVarint32();
+
+    if (length == 0) {
+      return "";
+    }
+
     try {
-      return new String(readBinary(), "UTF-8");
+      if (trans_.getBytesRemainingInBuffer() >= length) {
+        String str = new String(trans_.getBuffer(), trans_.getBufferPosition(), length, "UTF-8");
+        trans_.consumeBuffer(length);
+        return str;
+      } else {
+        return new String(readBinary(length), "UTF-8");
+      }
     } catch (UnsupportedEncodingException e) {
       throw new TException("UTF-8 not supported!");
     }
@@ -611,6 +630,16 @@
     return buf;
   }
 
+  /**
+   * Read a byte[] of a known length from the wire. 
+   */
+  private byte[] readBinary(int length) throws TException {
+    if (length == 0) return new byte[0];
+
+    byte[] buf = new byte[length];
+    trans_.readAll(buf, 0, length);
+    return buf;
+  }
 
   //
   // These methods are here for the struct to call, but don't have any wire 
@@ -692,7 +721,8 @@
   //
 
   private boolean isBoolType(byte b) {
-    return (b & 0x0f) == Types.BOOLEAN_TRUE || (b & 0x0f) == Types.BOOLEAN_FALSE;
+    int lowerNibble = b & 0x0f;
+    return lowerNibble == Types.BOOLEAN_TRUE || lowerNibble == Types.BOOLEAN_FALSE;
   }
 
   /**

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TProtocol.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TProtocol.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TProtocol.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/protocol/TProtocol.java Thu Feb 18 18:27:51 2010
@@ -142,5 +142,4 @@
   public abstract String readString() throws TException;
 
   public abstract byte[] readBinary() throws TException;
-
 }

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TFramedTransport.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TFramedTransport.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TFramedTransport.java Thu Feb 18 18:27:51 2010
@@ -24,8 +24,8 @@
 import org.apache.thrift.TByteArrayOutputStream;
 
 /**
- * Socket implementation of the TTransport interface. To be commented soon!
- *
+ * TFramedTransport is a buffered TTransport that ensures a fully read message
+ * every time by preceeding messages with a 4-byte frame size. 
  */
 public class TFramedTransport extends TTransport {
 

Added: incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java?rev=911510&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java (added)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java Thu Feb 18 18:27:51 2010
@@ -0,0 +1,61 @@
+package org.apache.thrift.transport;
+
+public class TMemoryInputTransport extends TTransport {
+
+  private byte[] buf_;
+  private int pos_;
+
+  public TMemoryInputTransport(byte[] buf) {
+    reset(buf);
+  }
+
+  public void reset(byte[] buf) {
+    buf_ = buf;
+    pos_ = 0;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int bytesRemaining = getBytesRemainingInBuffer();
+    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
+    if (amtToRead > 0) {
+      System.arraycopy(buf_, pos_, buf, off, amtToRead);
+      consumeBuffer(amtToRead);
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new UnsupportedOperationException("No writing allowed!");
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return buf_;
+  }
+
+  public int getBufferPosition() {
+    return pos_;
+  }
+
+  public int getBytesRemainingInBuffer() {
+    return buf_.length - pos_;
+  }
+
+  public void consumeBuffer(int len) {
+    pos_ += len;
+  }
+
+}

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TTransport.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TTransport.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TTransport.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TTransport.java Thu Feb 18 18:27:51 2010
@@ -118,4 +118,37 @@
    */
   public void flush()
     throws TTransportException {}
+
+  /**
+   * Access the protocol's underlying buffer directly. If this is not a
+   * buffered transport, return null.
+   * @return
+   */
+  public byte[] getBuffer() {
+    return null;
+  }
+
+  /**
+   * Return the index within the underlying buffer that specifies the next spot
+   * that should be read from.
+   * @return
+   */
+  public int getBufferPosition() {
+    return 0;
+  }
+
+  /**
+   * Get the number of bytes remaining in the underlying buffer. Returns -1 if
+   * this is a non-buffered transport.
+   * @return
+   */
+  public int getBytesRemainingInBuffer() {
+    return -1;
+  }
+
+  /**
+   * Consume len bytes from the underlying buffer.
+   * @param len
+   */
+  public void consumeBuffer(int len) {}
 }

Modified: incubator/thrift/trunk/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java?rev=911510&r1=911509&r2=911510&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java (original)
+++ incubator/thrift/trunk/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java Thu Feb 18 18:27:51 2010
@@ -24,7 +24,9 @@
 import java.util.List;
 
 import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TField;
@@ -138,6 +140,8 @@
     testMessage();
     
     testServerRequest();
+    
+    testTDeserializer();
   }
   
   public static void testNakedByte() throws Exception {
@@ -367,23 +371,17 @@
       }
 
       public int primitiveMethod() throws TException {
-        // TODO Auto-generated method stub
         return 0;
       }
 
       public CompactProtoTestStruct structMethod() throws TException {
-        // TODO Auto-generated method stub
         return null;
       }
 
       public void voidMethod() throws TException {
-        // TODO Auto-generated method stub
-        
       }
 
       public void methodWithDefaultArgs(int something) throws TException {
-        // TODO Auto-generated method stub
-        
       }
     };
     
@@ -452,4 +450,17 @@
     public abstract void writeMethod(TProtocol proto) throws TException;
     public abstract void readMethod(TProtocol proto) throws TException;
   }
+  
+  private static void testTDeserializer() throws TException {
+    TSerializer ser = new TSerializer(new TCompactProtocol.Factory());
+    byte[] bytes = ser.serialize(Fixtures.compactProtoTestStruct);
+    
+    TDeserializer deser = new TDeserializer(new TCompactProtocol.Factory());
+    CompactProtoTestStruct cpts = new CompactProtoTestStruct();
+    deser.deserialize(cpts, bytes);
+    
+    if (!Fixtures.compactProtoTestStruct.equals(cpts)) {
+      throw new RuntimeException(Fixtures.compactProtoTestStruct + " and " + cpts + " do not match!");
+    }
+  }
 }
\ No newline at end of file