You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/11 19:31:02 UTC

[3/3] git commit: FLUME-1896: Implement Thrift RpcClient

Updated Branches:
  refs/heads/trunk d203236b2 -> 60da3d860


FLUME-1896: Implement Thrift RpcClient

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/60da3d86
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/60da3d86
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/60da3d86

Branch: refs/heads/trunk
Commit: 60da3d8606415202f966017a18084cb59d3e64d1
Parents: d203236
Author: Brock Noland <br...@apache.org>
Authored: Mon Feb 11 12:30:29 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Feb 11 12:30:29 2013 -0600

----------------------------------------------------------------------
 .../flume-thrift-source/pom.xml                    |   90 +-
 .../flume/handlers/thrift/EventStatus.java         |    4 +-
 .../cloudera/flume/handlers/thrift/Priority.java   |    4 +-
 .../flume/handlers/thrift/ThriftFlumeEvent.java    |  405 +++--
 .../handlers/thrift/ThriftFlumeEventServer.java    |  495 +++--
 .../flume-thrift-source/src/main/thrift/aslv2      |   18 +
 flume-ng-sdk/pom.xml                               |   83 +
 .../main/java/org/apache/flume/api/HostInfo.java   |    2 +-
 .../flume/api/RpcClientConfigurationConstants.java |    7 +
 .../org/apache/flume/api/RpcClientFactory.java     |   63 +-
 .../java/org/apache/flume/api/ThriftRpcClient.java |  431 ++++
 .../main/java/org/apache/flume/thrift/Status.java  |   69 +
 .../org/apache/flume/thrift/ThriftFlumeEvent.java  |  580 +++++
 .../apache/flume/thrift/ThriftSourceProtocol.java  | 1813 +++++++++++++++
 flume-ng-sdk/src/main/thrift/aslv2                 |   18 +
 flume-ng-sdk/src/main/thrift/flume.thrift          |   38 +
 .../org/apache/flume/api/TestThriftRpcClient.java  |  213 ++
 .../org/apache/flume/api/ThriftTestingSource.java  |  227 ++
 flume-ng-sources/flume-scribe-source/pom.xml       |   90 +-
 .../org/apache/flume/source/scribe/LogEntry.java   |  219 ++-
 .../org/apache/flume/source/scribe/ResultCode.java |   14 +-
 .../org/apache/flume/source/scribe/Scribe.java     |  487 +++--
 .../apache/flume/source/scribe/ScribeSource.java   |   10 +-
 .../flume-scribe-source/src/main/thrift/aslv2      |   18 +
 .../src/main/thrift/scribe-source.thrift           |   34 +
 pom.xml                                            |   12 +-
 26 files changed, 4778 insertions(+), 666 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/pom.xml b/flume-ng-legacy-sources/flume-thrift-source/pom.xml
index b9667cd..2fb5bf1 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/pom.xml
+++ b/flume-ng-legacy-sources/flume-thrift-source/pom.xml
@@ -36,41 +36,63 @@ limitations under the License.
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
-      <properties>
-        <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
-      </properties>
       <build>
-       <plugins>
-         <plugin>
-           <groupId>org.apache.thrift.tools</groupId>
-           <artifactId>maven-thrift-plugin</artifactId>
-           <version>0.1.10</version>
-           <configuration>
-             <thriftExecutable>${thrift.executable}</thriftExecutable>
-           </configuration>
-           <executions>
-             <execution>
-               <id>thrift-sources</id>
-               <phase>generate-sources</phase>
-               <goals>
-                 <goal>compile</goal>
-               </goals>
-             </execution>
-           </executions>
-         </plugin>
-         <plugin>
-           <groupId>org.apache.maven.plugins</groupId>
-           <artifactId>maven-compiler-plugin</artifactId>
-           <version>2.3.2</version>
-           <configuration>
-             <source>1.6</source>
-             <target>1.6</target>
-             <excludes>
-               <exclude>**/com/**</exclude>
-             </excludes>
-           </configuration>
-         </plugin>
-       </plugins>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>compileThrift</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="target/compile-thrift.sh">
+                      LICENSE=src/main/thrift/aslv2
+                      THRIFT_DIR=src/main/thrift
+                      JAVA_DIR=target/generated-sources/thrift
+                      mkdir -p $JAVA_DIR 2&gt; /dev/null
+                      JSTATUS=$?
+                      if [ $JSTATUS -ne 0 ] ; then
+                        echo "Could not create $JAVA_DIR. Will not generate thrift files."
+                        exit $JSTATUS
+                      fi
+                      for THRIFT_FILE in `ls $THRIFT_DIR/*.thrift 2&gt; /dev/null`
+                      do
+                        thrift --gen java:hashcode -o $JAVA_DIR $THRIFT_FILE
+                      done
+                      SRC_DIR=$JAVA_DIR/gen-java/com/cloudera/flume/handlers/thrift
+                      DEST_DIR=src/main/java/com/cloudera/flume/handlers/thrift
+                      if [ ! -d $DEST_DIR ] ; then
+                        mkdir $DEST_DIR 2&gt; /dev/null
+                        STATUS=$?
+                        if [ $STATUS -ne 0 ] ; then
+                          echo "Could not create $DEST_DIR. Will not generate thrift files."
+                          exit $STATUS
+                        fi
+                      fi
+                      for JAVA_FILE in `ls $SRC_DIR/*.java 2&gt; /dev/null`
+                      do
+                        echo $JAVA_FILE
+                        cat $LICENSE > $JAVA_FILE.tmp
+                        cat $JAVA_FILE >> $JAVA_FILE.tmp
+                        mv $JAVA_FILE.tmp $JAVA_FILE
+                        cp $JAVA_FILE $DEST_DIR
+                      done
+                      rm -rf $JAVA_DIR
+                    </echo>
+                    <exec executable="sh" dir="${basedir}" failonerror="true">
+                      <arg line="target/compile-thrift.sh"/>
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
       </build>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
index 327107a..549ea16 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 /**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
  */
 package com.cloudera.flume.handlers.thrift;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
index d2495d2..6879817 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 /**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
  */
 package com.cloudera.flume.handlers.thrift;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
index 2bb6cfd..6b79cc9 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
@@ -16,15 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 /**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
  */
 package com.cloudera.flume.handlers.thrift;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
@@ -40,7 +49,6 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Deprecated
 public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEvent, ThriftFlumeEvent._Fields>, java.io.Serializable, Cloneable {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftFlumeEvent");
 
@@ -51,16 +59,22 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)5);
   private static final org.apache.thrift.protocol.TField FIELDS_FIELD_DESC = new org.apache.thrift.protocol.TField("fields", org.apache.thrift.protocol.TType.MAP, (short)6);
 
-  public long timestamp;
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new ThriftFlumeEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new ThriftFlumeEventTupleSchemeFactory());
+  }
+
+  public long timestamp; // required
   /**
    * 
    * @see Priority
    */
-  public Priority priority;
-  public ByteBuffer body;
-  public long nanos;
-  public String host;
-  public Map<String,ByteBuffer> fields;
+  public Priority priority; // required
+  public ByteBuffer body; // required
+  public long nanos; // required
+  public String host; // required
+  public Map<String,ByteBuffer> fields; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -142,8 +156,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   // isset id assignments
   private static final int __TIMESTAMP_ISSET_ID = 0;
   private static final int __NANOS_ISSET_ID = 1;
-  private BitSet __isset_bit_vector = new BitSet(2);
-
+  private byte __isset_bitfield = 0;
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -191,8 +204,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
    * Performs a deep copy on <i>other</i>.
    */
   public ThriftFlumeEvent(ThriftFlumeEvent other) {
-    __isset_bit_vector.clear();
-    __isset_bit_vector.or(other.__isset_bit_vector);
+    __isset_bitfield = other.__isset_bitfield;
     this.timestamp = other.timestamp;
     if (other.isSetPriority()) {
       this.priority = other.priority;
@@ -250,16 +262,16 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   }
 
   public void unsetTimestamp() {
-    __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
   }
 
   /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
   public boolean isSetTimestamp() {
-    return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+    return EncodingUtils.testBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
   }
 
   public void setTimestampIsSet(boolean value) {
-    __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMESTAMP_ISSET_ID, value);
   }
 
   /**
@@ -339,16 +351,16 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   }
 
   public void unsetNanos() {
-    __isset_bit_vector.clear(__NANOS_ISSET_ID);
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __NANOS_ISSET_ID);
   }
 
   /** Returns true if field nanos is set (has been assigned a value) and false otherwise */
   public boolean isSetNanos() {
-    return __isset_bit_vector.get(__NANOS_ISSET_ID);
+    return EncodingUtils.testBit(__isset_bitfield, __NANOS_ISSET_ID);
   }
 
   public void setNanosIsSet(boolean value) {
-    __isset_bit_vector.set(__NANOS_ISSET_ID, value);
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __NANOS_ISSET_ID, value);
   }
 
   public String getHost() {
@@ -466,7 +478,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   public Object getFieldValue(_Fields field) {
     switch (field) {
     case TIMESTAMP:
-      return new Long(getTimestamp());
+      return Long.valueOf(getTimestamp());
 
     case PRIORITY:
       return getPriority();
@@ -475,7 +487,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
       return getBody();
 
     case NANOS:
-      return new Long(getNanos());
+      return Long.valueOf(getNanos());
 
     case HOST:
       return getHost();
@@ -693,122 +705,11 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   }
 
   public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-    org.apache.thrift.protocol.TField field;
-    iprot.readStructBegin();
-    while (true)
-    {
-      field = iprot.readFieldBegin();
-      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
-        break;
-      }
-      switch (field.id) {
-        case 1: // TIMESTAMP
-          if (field.type == org.apache.thrift.protocol.TType.I64) {
-            this.timestamp = iprot.readI64();
-            setTimestampIsSet(true);
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 2: // PRIORITY
-          if (field.type == org.apache.thrift.protocol.TType.I32) {
-            this.priority = Priority.findByValue(iprot.readI32());
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 3: // BODY
-          if (field.type == org.apache.thrift.protocol.TType.STRING) {
-            this.body = iprot.readBinary();
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 4: // NANOS
-          if (field.type == org.apache.thrift.protocol.TType.I64) {
-            this.nanos = iprot.readI64();
-            setNanosIsSet(true);
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 5: // HOST
-          if (field.type == org.apache.thrift.protocol.TType.STRING) {
-            this.host = iprot.readString();
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 6: // FIELDS
-          if (field.type == org.apache.thrift.protocol.TType.MAP) {
-            {
-              org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
-              this.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
-              for (int _i1 = 0; _i1 < _map0.size; ++_i1)
-              {
-                String _key2;
-                ByteBuffer _val3;
-                _key2 = iprot.readString();
-                _val3 = iprot.readBinary();
-                this.fields.put(_key2, _val3);
-              }
-              iprot.readMapEnd();
-            }
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        default:
-          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-      }
-      iprot.readFieldEnd();
-    }
-    iprot.readStructEnd();
-
-    // check for required fields of primitive type, which can't be checked in the validate method
-    validate();
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
   }
 
   public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-    validate();
-
-    oprot.writeStructBegin(STRUCT_DESC);
-    oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
-    oprot.writeI64(this.timestamp);
-    oprot.writeFieldEnd();
-    if (this.priority != null) {
-      oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
-      oprot.writeI32(this.priority.getValue());
-      oprot.writeFieldEnd();
-    }
-    if (this.body != null) {
-      oprot.writeFieldBegin(BODY_FIELD_DESC);
-      oprot.writeBinary(this.body);
-      oprot.writeFieldEnd();
-    }
-    oprot.writeFieldBegin(NANOS_FIELD_DESC);
-    oprot.writeI64(this.nanos);
-    oprot.writeFieldEnd();
-    if (this.host != null) {
-      oprot.writeFieldBegin(HOST_FIELD_DESC);
-      oprot.writeString(this.host);
-      oprot.writeFieldEnd();
-    }
-    if (this.fields != null) {
-      oprot.writeFieldBegin(FIELDS_FIELD_DESC);
-      {
-        oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.fields.size()));
-        for (Map.Entry<String, ByteBuffer> _iter4 : this.fields.entrySet())
-        {
-          oprot.writeString(_iter4.getKey());
-          oprot.writeBinary(_iter4.getValue());
-        }
-        oprot.writeMapEnd();
-      }
-      oprot.writeFieldEnd();
-    }
-    oprot.writeFieldStop();
-    oprot.writeStructEnd();
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
   }
 
   @Override
@@ -861,6 +762,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
 
   public void validate() throws org.apache.thrift.TException {
     // check for required fields
+    // check for sub-struct validity
   }
 
   private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -874,12 +776,245 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
       // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bit_vector = new BitSet(1);
+      __isset_bitfield = 0;
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);
     }
   }
 
+  private static class ThriftFlumeEventStandardSchemeFactory implements SchemeFactory {
+    public ThriftFlumeEventStandardScheme getScheme() {
+      return new ThriftFlumeEventStandardScheme();
+    }
+  }
+
+  private static class ThriftFlumeEventStandardScheme extends StandardScheme<ThriftFlumeEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TIMESTAMP
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timestamp = iprot.readI64();
+              struct.setTimestampIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // PRIORITY
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.priority = Priority.findByValue(iprot.readI32());
+              struct.setPriorityIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // BODY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.body = iprot.readBinary();
+              struct.setBodyIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 4: // NANOS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.nanos = iprot.readI64();
+              struct.setNanosIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 5: // HOST
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.host = iprot.readString();
+              struct.setHostIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 6: // FIELDS
+            if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+              {
+                org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
+                struct.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
+                for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+                {
+                  String _key2; // required
+                  ByteBuffer _val3; // required
+                  _key2 = iprot.readString();
+                  _val3 = iprot.readBinary();
+                  struct.fields.put(_key2, _val3);
+                }
+                iprot.readMapEnd();
+              }
+              struct.setFieldsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+      oprot.writeI64(struct.timestamp);
+      oprot.writeFieldEnd();
+      if (struct.priority != null) {
+        oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
+        oprot.writeI32(struct.priority.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.body != null) {
+        oprot.writeFieldBegin(BODY_FIELD_DESC);
+        oprot.writeBinary(struct.body);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(NANOS_FIELD_DESC);
+      oprot.writeI64(struct.nanos);
+      oprot.writeFieldEnd();
+      if (struct.host != null) {
+        oprot.writeFieldBegin(HOST_FIELD_DESC);
+        oprot.writeString(struct.host);
+        oprot.writeFieldEnd();
+      }
+      if (struct.fields != null) {
+        oprot.writeFieldBegin(FIELDS_FIELD_DESC);
+        {
+          oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.fields.size()));
+          for (Map.Entry<String, ByteBuffer> _iter4 : struct.fields.entrySet())
+          {
+            oprot.writeString(_iter4.getKey());
+            oprot.writeBinary(_iter4.getValue());
+          }
+          oprot.writeMapEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ThriftFlumeEventTupleSchemeFactory implements SchemeFactory {
+    public ThriftFlumeEventTupleScheme getScheme() {
+      return new ThriftFlumeEventTupleScheme();
+    }
+  }
+
+  private static class ThriftFlumeEventTupleScheme extends TupleScheme<ThriftFlumeEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetTimestamp()) {
+        optionals.set(0);
+      }
+      if (struct.isSetPriority()) {
+        optionals.set(1);
+      }
+      if (struct.isSetBody()) {
+        optionals.set(2);
+      }
+      if (struct.isSetNanos()) {
+        optionals.set(3);
+      }
+      if (struct.isSetHost()) {
+        optionals.set(4);
+      }
+      if (struct.isSetFields()) {
+        optionals.set(5);
+      }
+      oprot.writeBitSet(optionals, 6);
+      if (struct.isSetTimestamp()) {
+        oprot.writeI64(struct.timestamp);
+      }
+      if (struct.isSetPriority()) {
+        oprot.writeI32(struct.priority.getValue());
+      }
+      if (struct.isSetBody()) {
+        oprot.writeBinary(struct.body);
+      }
+      if (struct.isSetNanos()) {
+        oprot.writeI64(struct.nanos);
+      }
+      if (struct.isSetHost()) {
+        oprot.writeString(struct.host);
+      }
+      if (struct.isSetFields()) {
+        {
+          oprot.writeI32(struct.fields.size());
+          for (Map.Entry<String, ByteBuffer> _iter5 : struct.fields.entrySet())
+          {
+            oprot.writeString(_iter5.getKey());
+            oprot.writeBinary(_iter5.getValue());
+          }
+        }
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(6);
+      if (incoming.get(0)) {
+        struct.timestamp = iprot.readI64();
+        struct.setTimestampIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.priority = Priority.findByValue(iprot.readI32());
+        struct.setPriorityIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.body = iprot.readBinary();
+        struct.setBodyIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.nanos = iprot.readI64();
+        struct.setNanosIsSet(true);
+      }
+      if (incoming.get(4)) {
+        struct.host = iprot.readString();
+        struct.setHostIsSet(true);
+      }
+      if (incoming.get(5)) {
+        {
+          org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.fields = new HashMap<String,ByteBuffer>(2*_map6.size);
+          for (int _i7 = 0; _i7 < _map6.size; ++_i7)
+          {
+            String _key8; // required
+            ByteBuffer _val9; // required
+            _key8 = iprot.readString();
+            _val9 = iprot.readBinary();
+            struct.fields.put(_key8, _val9);
+          }
+        }
+        struct.setFieldsIsSet(true);
+      }
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
index 0f2ad2d..208838a 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
@@ -16,15 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 /**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
  */
 package com.cloudera.flume.handlers.thrift;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
@@ -40,7 +49,6 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Deprecated
 public class ThriftFlumeEventServer {
 
   public interface Iface {
@@ -59,7 +67,7 @@ public class ThriftFlumeEventServer {
 
   }
 
-  public static class Client implements org.apache.thrift.TServiceClient, Iface {
+  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
     public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
       public Factory() {}
       public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
@@ -72,28 +80,11 @@ public class ThriftFlumeEventServer {
 
     public Client(org.apache.thrift.protocol.TProtocol prot)
     {
-      this(prot, prot);
-    }
-
-    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot)
-    {
-      iprot_ = iprot;
-      oprot_ = oprot;
-    }
-
-    protected org.apache.thrift.protocol.TProtocol iprot_;
-    protected org.apache.thrift.protocol.TProtocol oprot_;
-
-    protected int seqid_;
-
-    public org.apache.thrift.protocol.TProtocol getInputProtocol()
-    {
-      return this.iprot_;
+      super(prot, prot);
     }
 
-    public org.apache.thrift.protocol.TProtocol getOutputProtocol()
-    {
-      return this.oprot_;
+    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+      super(iprot, oprot);
     }
 
     public void append(ThriftFlumeEvent evt) throws org.apache.thrift.TException
@@ -103,12 +94,9 @@ public class ThriftFlumeEventServer {
 
     public void send_append(ThriftFlumeEvent evt) throws org.apache.thrift.TException
     {
-      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("append", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
       append_args args = new append_args();
       args.setEvt(evt);
-      args.write(oprot_);
-      oprot_.writeMessageEnd();
-      oprot_.getTransport().flush();
+      sendBase("append", args);
     }
 
     public void close() throws org.apache.thrift.TException
@@ -119,27 +107,14 @@ public class ThriftFlumeEventServer {
 
     public void send_close() throws org.apache.thrift.TException
     {
-      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
       close_args args = new close_args();
-      args.write(oprot_);
-      oprot_.writeMessageEnd();
-      oprot_.getTransport().flush();
+      sendBase("close", args);
     }
 
     public void recv_close() throws org.apache.thrift.TException
     {
-      org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
-      if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
-        org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
-        iprot_.readMessageEnd();
-        throw x;
-      }
-      if (msg.seqid != seqid_) {
-        throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "close failed: out of sequence response");
-      }
       close_result result = new close_result();
-      result.read(iprot_);
-      iprot_.readMessageEnd();
+      receiveBase(result, "close");
       return;
     }
 
@@ -163,9 +138,9 @@ public class ThriftFlumeEventServer {
 
     public void append(ThriftFlumeEvent evt, org.apache.thrift.async.AsyncMethodCallback<append_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      append_call method_call = new append_call(evt, resultHandler, this, protocolFactory, transport);
-      this.currentMethod = method_call;
-      manager.call(method_call);
+      append_call method_call = new append_call(evt, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
     }
 
     public static class append_call extends org.apache.thrift.async.TAsyncMethodCall {
@@ -194,9 +169,9 @@ public class ThriftFlumeEventServer {
 
     public void close(org.apache.thrift.async.AsyncMethodCallback<close_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      close_call method_call = new close_call(resultHandler, this, protocolFactory, transport);
-      this.currentMethod = method_call;
-      manager.call(method_call);
+      close_call method_call = new close_call(resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
     }
 
     public static class close_call extends org.apache.thrift.async.TAsyncMethodCall {
@@ -223,85 +198,59 @@ public class ThriftFlumeEventServer {
 
   }
 
-  public static class Processor implements org.apache.thrift.TProcessor {
+  public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
     private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
-    public Processor(Iface iface)
-    {
-      iface_ = iface;
-      processMap_.put("append", new append());
-      processMap_.put("close", new close());
+    public Processor(I iface) {
+      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
     }
 
-    protected static interface ProcessFunction {
-      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
+    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
+      super(iface, getProcessMap(processMap));
     }
 
-    private Iface iface_;
-    protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
+    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
+      processMap.put("append", new append());
+      processMap.put("close", new close());
+      return processMap;
+    }
 
-    public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
-    {
-      org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
-      ProcessFunction fn = processMap_.get(msg.name);
-      if (fn == null) {
-        org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
-        iprot.readMessageEnd();
-        org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
-        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
-        x.write(oprot);
-        oprot.writeMessageEnd();
-        oprot.getTransport().flush();
+    public static class append<I extends Iface> extends org.apache.thrift.ProcessFunction<I, append_args> {
+      public append() {
+        super("append");
+      }
+
+      public append_args getEmptyArgsInstance() {
+        return new append_args();
+      }
+
+      protected boolean isOneway() {
         return true;
       }
-      fn.process(msg.seqid, iprot, oprot);
-      return true;
-    }
 
-    private class append implements ProcessFunction {
-      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
-      {
-        append_args args = new append_args();
-        try {
-          args.read(iprot);
-        } catch (org.apache.thrift.protocol.TProtocolException e) {
-          iprot.readMessageEnd();
-          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
-          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("append", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
-          x.write(oprot);
-          oprot.writeMessageEnd();
-          oprot.getTransport().flush();
-          return;
-        }
-        iprot.readMessageEnd();
-        iface_.append(args.evt);
-        return;
+      public org.apache.thrift.TBase getResult(I iface, append_args args) throws org.apache.thrift.TException {
+        iface.append(args.evt);
+        return null;
       }
     }
 
-    private class close implements ProcessFunction {
-      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
-      {
-        close_args args = new close_args();
-        try {
-          args.read(iprot);
-        } catch (org.apache.thrift.protocol.TProtocolException e) {
-          iprot.readMessageEnd();
-          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
-          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
-          x.write(oprot);
-          oprot.writeMessageEnd();
-          oprot.getTransport().flush();
-          return;
-        }
-        iprot.readMessageEnd();
-        close_result result = new close_result();
-        iface_.close();
-        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
-        result.write(oprot);
-        oprot.writeMessageEnd();
-        oprot.getTransport().flush();
+    public static class close<I extends Iface> extends org.apache.thrift.ProcessFunction<I, close_args> {
+      public close() {
+        super("close");
       }
 
+      public close_args getEmptyArgsInstance() {
+        return new close_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public close_result getResult(I iface, close_args args) throws org.apache.thrift.TException {
+        close_result result = new close_result();
+        iface.close();
+        return result;
+      }
     }
 
   }
@@ -311,7 +260,13 @@ public class ThriftFlumeEventServer {
 
     private static final org.apache.thrift.protocol.TField EVT_FIELD_DESC = new org.apache.thrift.protocol.TField("evt", org.apache.thrift.protocol.TType.STRUCT, (short)1);
 
-    public ThriftFlumeEvent evt;
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new append_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new append_argsTupleSchemeFactory());
+    }
+
+    public ThriftFlumeEvent evt; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -372,7 +327,6 @@ public class ThriftFlumeEventServer {
     }
 
     // isset id assignments
-
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -532,45 +486,11 @@ public class ThriftFlumeEventServer {
     }
 
     public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField field;
-      iprot.readStructBegin();
-      while (true)
-      {
-        field = iprot.readFieldBegin();
-        if (field.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (field.id) {
-          case 1: // EVT
-            if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
-              this.evt = new ThriftFlumeEvent();
-              this.evt.read(iprot);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-            }
-            break;
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      validate();
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
     }
 
     public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-      validate();
-
-      oprot.writeStructBegin(STRUCT_DESC);
-      if (this.evt != null) {
-        oprot.writeFieldBegin(EVT_FIELD_DESC);
-        this.evt.write(oprot);
-        oprot.writeFieldEnd();
-      }
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
     }
 
     @Override
@@ -591,6 +511,10 @@ public class ThriftFlumeEventServer {
 
     public void validate() throws org.apache.thrift.TException {
       // check for required fields
+      // check for sub-struct validity
+      if (evt != null) {
+        evt.validate();
+      }
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -609,12 +533,104 @@ public class ThriftFlumeEventServer {
       }
     }
 
+    private static class append_argsStandardSchemeFactory implements SchemeFactory {
+      public append_argsStandardScheme getScheme() {
+        return new append_argsStandardScheme();
+      }
+    }
+
+    private static class append_argsStandardScheme extends StandardScheme<append_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, append_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // EVT
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.evt = new ThriftFlumeEvent();
+                struct.evt.read(iprot);
+                struct.setEvtIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, append_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.evt != null) {
+          oprot.writeFieldBegin(EVT_FIELD_DESC);
+          struct.evt.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class append_argsTupleSchemeFactory implements SchemeFactory {
+      public append_argsTupleScheme getScheme() {
+        return new append_argsTupleScheme();
+      }
+    }
+
+    private static class append_argsTupleScheme extends TupleScheme<append_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, append_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetEvt()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetEvt()) {
+          struct.evt.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, append_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.evt = new ThriftFlumeEvent();
+          struct.evt.read(iprot);
+          struct.setEvtIsSet(true);
+        }
+      }
+    }
+
   }
 
   public static class close_args implements org.apache.thrift.TBase<close_args, close_args._Fields>, java.io.Serializable, Cloneable   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("close_args");
 
 
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new close_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new close_argsTupleSchemeFactory());
+    }
+
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -756,32 +772,11 @@ public class ThriftFlumeEventServer {
     }
 
     public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField field;
-      iprot.readStructBegin();
-      while (true)
-      {
-        field = iprot.readFieldBegin();
-        if (field.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (field.id) {
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      validate();
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
     }
 
     public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-      validate();
-
-      oprot.writeStructBegin(STRUCT_DESC);
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
     }
 
     @Override
@@ -795,6 +790,7 @@ public class ThriftFlumeEventServer {
 
     public void validate() throws org.apache.thrift.TException {
       // check for required fields
+      // check for sub-struct validity
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -813,12 +809,76 @@ public class ThriftFlumeEventServer {
       }
     }
 
+    private static class close_argsStandardSchemeFactory implements SchemeFactory {
+      public close_argsStandardScheme getScheme() {
+        return new close_argsStandardScheme();
+      }
+    }
+
+    private static class close_argsStandardScheme extends StandardScheme<close_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, close_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, close_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class close_argsTupleSchemeFactory implements SchemeFactory {
+      public close_argsTupleScheme getScheme() {
+        return new close_argsTupleScheme();
+      }
+    }
+
+    private static class close_argsTupleScheme extends TupleScheme<close_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, close_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, close_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
   }
 
   public static class close_result implements org.apache.thrift.TBase<close_result, close_result._Fields>, java.io.Serializable, Cloneable   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("close_result");
 
 
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new close_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new close_resultTupleSchemeFactory());
+    }
+
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -960,32 +1020,12 @@ public class ThriftFlumeEventServer {
     }
 
     public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField field;
-      iprot.readStructBegin();
-      while (true)
-      {
-        field = iprot.readFieldBegin();
-        if (field.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (field.id) {
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      validate();
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
     }
 
     public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-      oprot.writeStructBegin(STRUCT_DESC);
-
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
-    }
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
 
     @Override
     public String toString() {
@@ -998,6 +1038,7 @@ public class ThriftFlumeEventServer {
 
     public void validate() throws org.apache.thrift.TException {
       // check for required fields
+      // check for sub-struct validity
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -1016,6 +1057,64 @@ public class ThriftFlumeEventServer {
       }
     }
 
+    private static class close_resultStandardSchemeFactory implements SchemeFactory {
+      public close_resultStandardScheme getScheme() {
+        return new close_resultStandardScheme();
+      }
+    }
+
+    private static class close_resultStandardScheme extends StandardScheme<close_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, close_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, close_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class close_resultTupleSchemeFactory implements SchemeFactory {
+      public close_resultTupleScheme getScheme() {
+        return new close_resultTupleScheme();
+      }
+    }
+
+    private static class close_resultTupleScheme extends TupleScheme<close_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, close_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, close_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2 b/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
new file mode 100644
index 0000000..7243427
--- /dev/null
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml
index ab066d5..3709339 100644
--- a/flume-ng-sdk/pom.xml
+++ b/flume-ng-sdk/pom.xml
@@ -28,6 +28,72 @@ limitations under the License.
   <name>Flume NG SDK</name>
   <description>Flume Software Development Kit: Stable public API for integration with Flume 1.x</description>
 
+  <profiles>
+    <profile>
+      <id>compileThrift</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>compileThrift</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="target/compile-thrift.sh">
+                      LICENSE=src/main/thrift/aslv2
+                      THRIFT_DIR=src/main/thrift
+                      JAVA_DIR=target/generated-sources/thrift
+                      mkdir -p $JAVA_DIR 2&gt; /dev/null
+                      JSTATUS=$?
+                      if [ $JSTATUS -ne 0 ] ; then
+                        echo "Could not create $JAVA_DIR. Will not generate thrift files."
+                        exit $JSTATUS
+                      fi
+                      for THRIFT_FILE in `ls $THRIFT_DIR/*.thrift 2&gt; /dev/null`
+                      do
+                        thrift --gen java:hashcode -o $JAVA_DIR $THRIFT_FILE
+                      done
+                      SRC_DIR=$JAVA_DIR/gen-java/org/apache/flume/thrift/
+                      DEST_DIR=src/main/java/org/apache/flume/thrift
+                      if [ ! -d $DEST_DIR ] ; then
+                        mkdir $DEST_DIR 2&gt; /dev/null
+                        STATUS=$?
+                        if [ $STATUS -ne 0 ] ; then
+                          echo "Could not create $DEST_DIR. Will not generate thrift files."
+                          exit $STATUS
+                        fi
+                      fi
+                      for JAVA_FILE in `ls $SRC_DIR/*.java 2&gt; /dev/null`
+                      do
+                        echo $JAVA_FILE
+                        cat $LICENSE > $JAVA_FILE.tmp
+                        cat $JAVA_FILE >> $JAVA_FILE.tmp
+                        mv $JAVA_FILE.tmp $JAVA_FILE
+                        cp $JAVA_FILE $DEST_DIR
+                      done
+                      rm -rf $JAVA_DIR
+                    </echo>
+                    <exec executable="sh" dir="${basedir}" failonerror="true">
+                      <arg line="target/compile-thrift.sh"/>
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
   <build>
     <plugins>
 
@@ -64,6 +130,18 @@ limitations under the License.
         </executions>
       </plugin>
 
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
   </build>
 
@@ -105,5 +183,10 @@ limitations under the License.
       <artifactId>netty</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
index 7388a45..8a81208 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
@@ -63,7 +63,7 @@ public class HostInfo {
     return referenceName + "{" + hostName + ":" + portNumber + "}";
   }
 
-  static List<HostInfo> getHostInfoList(Properties properties) {
+  public static List<HostInfo> getHostInfoList(Properties properties) {
     List<HostInfo> hosts = new ArrayList<HostInfo>();
     String hostNames = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_HOSTS);

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index ab4c3de..1e642d8 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -119,6 +119,13 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_BACKOFF = "backoff";
   public static final String DEFAULT_BACKOFF = "false";
 
+  /**
+   * Maximum number of connections each Thrift Rpc client can open to a given
+   * host.
+   */
+  public static final String CONFIG_CONNECTION_POOL_SIZE = "maxConnections";
+  public static final int DEFAULT_CONNECTION_POOL_SIZE = 5;
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
index 3c93921..00d9cf0 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
@@ -170,11 +170,72 @@ public class RpcClientFactory {
     return client;
   }
 
+  /**
+   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+   * the next hop. The next hop must have a ThriftSource listening on the
+   * specified port.
+   * @param hostname - The hostname of the next hop.
+   * @param port - The port on which the ThriftSource is listening
+   * @param batchSize - batch size of each transaction.
+   * @return an {@linkplain RpcClient} which uses thrift configured with the
+   * given parameters.
+   */
+  public static RpcClient getThriftInstance(String hostname, Integer port,
+    Integer batchSize) {
+    if (hostname == null) {
+      throw new NullPointerException("hostname must not be null");
+    }
+    if (port == null) {
+      throw new NullPointerException("port must not be null");
+    }
+    if (batchSize == null) {
+      throw new NullPointerException("batchSize must not be null");
+    }
+
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+      hostname + ":" + port.intValue());
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
+    ThriftRpcClient client = new ThriftRpcClient();
+    client.configure(props);
+    return client;
+  }
+
+  /**
+   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+   * the next hop. The next hop must have a ThriftSource listening on the
+   * specified port. This will use the default batch size. See {@linkplain
+   * RpcClientConfigurationConstants}
+   * @param hostname - The hostname of the next hop.
+   * @param port - The port on which the ThriftSource is listening
+   * @return - An {@linkplain RpcClient} which uses thrift configured with the
+   * given parameters.
+   */
+  public static RpcClient getThriftInstance(String hostname, Integer port) {
+    return getThriftInstance(hostname, port, RpcClientConfigurationConstants
+      .DEFAULT_BATCH_SIZE);
+  }
+
+  /**
+   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+   * the next hop.
+   * @param props
+   * @return - An {@linkplain RpcClient} which uses thrift configured with the
+   * given parameters.
+   */
+  public static RpcClient getThriftInstance(Properties props) {
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+      ClientType.THRIFT.clientClassName);
+    return getInstance(props);
+  }
+
   public static enum ClientType {
     OTHER(null),
     DEFAULT(NettyAvroRpcClient.class.getCanonicalName()),
     DEFAULT_FAILOVER(FailoverRpcClient.class.getCanonicalName()),
-    DEFAULT_LOADBALANCE(LoadBalancingRpcClient.class.getCanonicalName());
+    DEFAULT_LOADBALANCE(LoadBalancingRpcClient.class.getCanonicalName()),
+    THRIFT(ThriftRpcClient.class.getCanonicalName());
 
 
     private final String clientClassName;

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
new file mode 100644
index 0000000..cf45ab9
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.thrift.Status;
+import org.apache.flume.thrift.ThriftFlumeEvent;
+import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ThriftRpcClient extends AbstractRpcClient {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(ThriftRpcClient.class);
+
+  private int batchSize;
+  private long requestTimeout;
+  private final Lock stateLock;
+  private State connState;
+  private String hostname;
+  private int port;
+  private ConnectionPoolManager connectionManager;
+  private final ExecutorService callTimeoutPool;
+  private final AtomicLong threadCounter;
+  private int connectionPoolSize;
+  private final Random random = new Random();
+
+  public ThriftRpcClient() {
+    stateLock = new ReentrantLock(true);
+    connState = State.INIT;
+
+    threadCounter = new AtomicLong(0);
+    // OK to use cached threadpool, because this is simply meant to timeout
+    // the calls - and is IO bound.
+    callTimeoutPool = Executors.newCachedThreadPool(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName("Flume Thrift RPC thread - " + String.valueOf(
+          threadCounter.incrementAndGet()));
+        return t;
+      }
+    });
+  }
+
+
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
+  public void append(Event event) throws EventDeliveryException {
+    // Thrift IPC client is not thread safe, so don't allow state changes or
+    // client.append* calls unless the lock is acquired.
+    ClientWrapper client = null;
+    boolean destroyedClient = false;
+    try {
+      if (!isActive()) {
+        throw new EventDeliveryException("Client was closed due to error. " +
+          "Please create a new client");
+      }
+      client = connectionManager.checkout();
+      final ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(event
+        .getHeaders(), ByteBuffer.wrap(event.getBody()));
+      doAppend(client, thriftEvent).get(requestTimeout, TimeUnit.MILLISECONDS);
+    } catch (Throwable e) {
+      if (e instanceof ExecutionException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof EventDeliveryException) {
+          throw (EventDeliveryException) cause;
+        } else if (cause instanceof TimeoutException) {
+          throw new EventDeliveryException("Append call timeout", cause);
+        }
+      }
+      destroyedClient = true;
+      // If destroy throws, we still don't want to reuse the client, so mark it
+      // as destroyed before we actually do.
+      if (client != null) {
+        connectionManager.destroy(client);
+      }
+      if (e instanceof Error) {
+        throw (Error) e;
+      } else if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      }
+      throw new EventDeliveryException("Failed to send event. ", e);
+    } finally {
+      if (client != null && !destroyedClient) {
+        connectionManager.checkIn(client);
+      }
+    }
+  }
+
+  @Override
+  public void appendBatch(List<Event> events) throws EventDeliveryException {
+    // Thrift IPC client is not thread safe, so don't allow state changes or
+    // client.append* calls unless the lock is acquired.
+    ClientWrapper client = null;
+    boolean destroyedClient = false;
+    try {
+      if (!isActive()) {
+        throw new EventDeliveryException("Client was closed " +
+          "due to error or is not yet configured.");
+      }
+      client = connectionManager.checkout();
+      final List<ThriftFlumeEvent> thriftFlumeEvents = new ArrayList
+        <ThriftFlumeEvent>();
+      Iterator<Event> eventsIter = events.iterator();
+      while (eventsIter.hasNext()) {
+        thriftFlumeEvents.clear();
+        for (int i = 0; i < batchSize && eventsIter.hasNext(); i++) {
+          Event event = eventsIter.next();
+          thriftFlumeEvents.add(new ThriftFlumeEvent(event.getHeaders(),
+            ByteBuffer.wrap(event.getBody())));
+        }
+        if (!thriftFlumeEvents.isEmpty()) {
+          doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,
+            TimeUnit.MILLISECONDS);
+        }
+      }
+    } catch (Throwable e) {
+      if (e instanceof ExecutionException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof EventDeliveryException) {
+          throw (EventDeliveryException) cause;
+        } else if (cause instanceof TimeoutException) {
+          throw new EventDeliveryException("Append call timeout", cause);
+        }
+      }
+      destroyedClient = true;
+      // If destroy throws, we still don't want to reuse the client, so mark it
+      // as destroyed before we actually do.
+      if (client != null) {
+        connectionManager.destroy(client);
+      }
+      if (e instanceof Error) {
+        throw (Error) e;
+      } else if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      }
+      throw new EventDeliveryException("Failed to send event. ", e);
+    } finally {
+      if (client != null && !destroyedClient) {
+        connectionManager.checkIn(client);
+      }
+    }
+  }
+
+  private Future<Void> doAppend(final ClientWrapper client,
+    final ThriftFlumeEvent e) throws Exception {
+
+    return callTimeoutPool.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Status status = client.client.append(e);
+        if (status != Status.OK) {
+          throw new EventDeliveryException("Failed to deliver events. Server " +
+            "returned status : " + status.name());
+        }
+        return null;
+      }
+    });
+  }
+
+  private Future<Void> doAppendBatch(final ClientWrapper client,
+    final List<ThriftFlumeEvent> e) throws Exception {
+
+    return callTimeoutPool.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Status status = client.client.appendBatch(e);
+        if (status != Status.OK) {
+          throw new EventDeliveryException("Failed to deliver events. Server " +
+            "returned status : " + status.name());
+        }
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public boolean isActive() {
+    stateLock.lock();
+    try {
+      return (connState == State.READY);
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws FlumeException {
+    try {
+      //Do not release this, because this client is not to be used again
+      stateLock.lock();
+      connState = State.DEAD;
+      connectionManager.closeAll();
+      callTimeoutPool.shutdown();
+      if(!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) {
+        callTimeoutPool.shutdownNow();
+      }
+    } catch (Throwable ex) {
+      if(ex instanceof Error) {
+        throw (Error) ex;
+      } else if (ex instanceof RuntimeException) {
+        throw (RuntimeException) ex;
+      }
+      throw new FlumeException("Failed to close RPC client. ", ex);
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  @Override
+  protected void configure(Properties properties) throws FlumeException {
+    if (isActive()) {
+      throw new FlumeException("Attempting to re-configured an already " +
+        "configured client!");
+    }
+    stateLock.lock();
+    try {
+      HostInfo host = HostInfo.getHostInfoList(properties).get(0);
+      hostname = host.getHostName();
+      port = host.getPortNumber();
+      batchSize = Integer.parseInt(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+        RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
+      requestTimeout = Long.parseLong(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+        String.valueOf(
+          RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
+      if (requestTimeout < 1000) {
+        LOGGER.warn("Request timeout specified less than 1s. " +
+          "Using default value instead.");
+        requestTimeout =
+          RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
+      }
+      connectionPoolSize = Integer.parseInt(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
+        String.valueOf(RpcClientConfigurationConstants
+          .DEFAULT_CONNECTION_POOL_SIZE)));
+      if(connectionPoolSize < 1) {
+        LOGGER.warn("Connection Pool Size specified is less than 1. " +
+          "Using default value instead.");
+        connectionPoolSize = RpcClientConfigurationConstants
+          .DEFAULT_CONNECTION_POOL_SIZE;
+      }
+      connectionManager = new ConnectionPoolManager(connectionPoolSize);
+      connState = State.READY;
+    } catch (Throwable ex) {
+      //Failed to configure, kill the client.
+      connState = State.DEAD;
+      if(ex instanceof Error) {
+        throw (Error) ex;
+      } else if (ex instanceof RuntimeException) {
+        throw (RuntimeException) ex;
+      }
+      throw new FlumeException("Error while configuring RpcClient. ", ex);
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  private static enum State {
+    INIT, READY, DEAD
+  }
+
+  /**
+   * Wrapper around a client and transport, so we can clean up when this
+   * client gets closed.
+   */
+  private class ClientWrapper {
+    public final ThriftSourceProtocol.Client client;
+    public final TFastFramedTransport transport;
+    private final int hashCode;
+
+    public ClientWrapper() throws Exception{
+      transport = new TFastFramedTransport(new TSocket(hostname, port));
+      transport.open();
+      client = new ThriftSourceProtocol.Client(new TCompactProtocol
+        (transport));
+      // Not a great hash code, but since this class is immutable and there
+      // is at most one instance of the components of this class,
+      // this works fine [If the objects are equal, hash code is the same]
+      hashCode = random.nextInt();
+    }
+
+    public boolean equals(Object o) {
+      if(o == null) {
+        return false;
+      }
+      // Since there is only one wrapper with any given client,
+      // direct comparison is good enough.
+      if(this == o) {
+        return true;
+      }
+      return false;
+    }
+
+    public int hashCode() {
+      return hashCode;
+    }
+  }
+
+  private class ConnectionPoolManager {
+    private final Queue<ClientWrapper> availableClients;
+    private final Set<ClientWrapper> checkedOutClients;
+    private final int maxPoolSize;
+    private int currentPoolSize;
+    private final Lock poolLock;
+    private final Condition availableClientsCondition;
+
+    public ConnectionPoolManager(int poolSize) {
+      this.maxPoolSize = poolSize;
+      availableClients = new LinkedList<ClientWrapper>();
+      checkedOutClients = new HashSet<ClientWrapper>();
+      poolLock = new ReentrantLock();
+      availableClientsCondition = poolLock.newCondition();
+      currentPoolSize = 0;
+    }
+
+    public ClientWrapper checkout() throws Exception {
+
+      ClientWrapper ret = null;
+      poolLock.lock();
+      try {
+        if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
+          ret = new ClientWrapper();
+          currentPoolSize++;
+          checkedOutClients.add(ret);
+          return ret;
+        }
+        while (availableClients.isEmpty()) {
+          availableClientsCondition.await();
+        }
+        ret = availableClients.poll();
+        checkedOutClients.add(ret);
+      } finally {
+        poolLock.unlock();
+      }
+      return ret;
+    }
+
+    public void checkIn(ClientWrapper client) {
+      poolLock.lock();
+      try {
+        availableClients.add(client);
+        checkedOutClients.remove(client);
+        availableClientsCondition.signal();
+      } finally {
+        poolLock.unlock();
+      }
+    }
+
+    public void destroy(ClientWrapper client) {
+      poolLock.lock();
+      try {
+        checkedOutClients.remove(client);
+        currentPoolSize--;
+      } finally {
+        poolLock.unlock();
+      }
+      client.transport.close();
+    }
+
+    public void closeAll() {
+      poolLock.lock();
+      try {
+        for (ClientWrapper c : availableClients) {
+          c.transport.close();
+          currentPoolSize--;
+        }
+      /*
+       * Be cruel and close even the checked out clients. The threads writing
+       * using these will now get an exception.
+       */
+        for (ClientWrapper c : checkedOutClients) {
+          c.transport.close();
+          currentPoolSize--;
+        }
+      } finally {
+        poolLock.unlock();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java b/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
new file mode 100644
index 0000000..c05f4e3
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.flume.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum Status implements org.apache.thrift.TEnum {
+  OK(0),
+  FAILED(1),
+  ERROR(2),
+  UNKNOWN(3);
+
+  private final int value;
+
+  private Status(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static Status findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return OK;
+      case 1:
+        return FAILED;
+      case 2:
+        return ERROR;
+      case 3:
+        return UNKNOWN;
+      default:
+        return null;
+    }
+  }
+}