You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/11 19:31:02 UTC
[3/3] git commit: FLUME-1896: Implement Thrift RpcClient
Updated Branches:
refs/heads/trunk d203236b2 -> 60da3d860
FLUME-1896: Implement Thrift RpcClient
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/60da3d86
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/60da3d86
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/60da3d86
Branch: refs/heads/trunk
Commit: 60da3d8606415202f966017a18084cb59d3e64d1
Parents: d203236
Author: Brock Noland <br...@apache.org>
Authored: Mon Feb 11 12:30:29 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Feb 11 12:30:29 2013 -0600
----------------------------------------------------------------------
.../flume-thrift-source/pom.xml | 90 +-
.../flume/handlers/thrift/EventStatus.java | 4 +-
.../cloudera/flume/handlers/thrift/Priority.java | 4 +-
.../flume/handlers/thrift/ThriftFlumeEvent.java | 405 +++--
.../handlers/thrift/ThriftFlumeEventServer.java | 495 +++--
.../flume-thrift-source/src/main/thrift/aslv2 | 18 +
flume-ng-sdk/pom.xml | 83 +
.../main/java/org/apache/flume/api/HostInfo.java | 2 +-
.../flume/api/RpcClientConfigurationConstants.java | 7 +
.../org/apache/flume/api/RpcClientFactory.java | 63 +-
.../java/org/apache/flume/api/ThriftRpcClient.java | 431 ++++
.../main/java/org/apache/flume/thrift/Status.java | 69 +
.../org/apache/flume/thrift/ThriftFlumeEvent.java | 580 +++++
.../apache/flume/thrift/ThriftSourceProtocol.java | 1813 +++++++++++++++
flume-ng-sdk/src/main/thrift/aslv2 | 18 +
flume-ng-sdk/src/main/thrift/flume.thrift | 38 +
.../org/apache/flume/api/TestThriftRpcClient.java | 213 ++
.../org/apache/flume/api/ThriftTestingSource.java | 227 ++
flume-ng-sources/flume-scribe-source/pom.xml | 90 +-
.../org/apache/flume/source/scribe/LogEntry.java | 219 ++-
.../org/apache/flume/source/scribe/ResultCode.java | 14 +-
.../org/apache/flume/source/scribe/Scribe.java | 487 +++--
.../apache/flume/source/scribe/ScribeSource.java | 10 +-
.../flume-scribe-source/src/main/thrift/aslv2 | 18 +
.../src/main/thrift/scribe-source.thrift | 34 +
pom.xml | 12 +-
26 files changed, 4778 insertions(+), 666 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/pom.xml b/flume-ng-legacy-sources/flume-thrift-source/pom.xml
index b9667cd..2fb5bf1 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/pom.xml
+++ b/flume-ng-legacy-sources/flume-thrift-source/pom.xml
@@ -36,41 +36,63 @@ limitations under the License.
<activation>
<activeByDefault>false</activeByDefault>
</activation>
- <properties>
- <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
- </properties>
<build>
- <plugins>
- <plugin>
- <groupId>org.apache.thrift.tools</groupId>
- <artifactId>maven-thrift-plugin</artifactId>
- <version>0.1.10</version>
- <configuration>
- <thriftExecutable>${thrift.executable}</thriftExecutable>
- </configuration>
- <executions>
- <execution>
- <id>thrift-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <excludes>
- <exclude>**/com/**</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compileThrift</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <echo file="target/compile-thrift.sh">
+ LICENSE=src/main/thrift/aslv2
+ THRIFT_DIR=src/main/thrift
+ JAVA_DIR=target/generated-sources/thrift
+ mkdir -p $JAVA_DIR 2> /dev/null
+ JSTATUS=$?
+ if [ $JSTATUS -ne 0 ] ; then
+ echo "Could not create $JAVA_DIR. Will not generate thrift files."
+ exit $JSTATUS
+ fi
+ for THRIFT_FILE in `ls $THRIFT_DIR/*.thrift 2> /dev/null`
+ do
+ thrift --gen java:hashcode -o $JAVA_DIR $THRIFT_FILE
+ done
+ SRC_DIR=$JAVA_DIR/gen-java/com/cloudera/flume/handlers/thrift
+ DEST_DIR=src/main/java/com/cloudera/flume/handlers/thrift
+ if [ ! -d $DEST_DIR ] ; then
+ mkdir $DEST_DIR 2> /dev/null
+ STATUS=$?
+ if [ $STATUS -ne 0 ] ; then
+ echo "Could not create $DEST_DIR. Will not generate thrift files."
+ exit $STATUS
+ fi
+ fi
+ for JAVA_FILE in `ls $SRC_DIR/*.java 2> /dev/null`
+ do
+ echo $JAVA_FILE
+ cat $LICENSE > $JAVA_FILE.tmp
+ cat $JAVA_FILE >> $JAVA_FILE.tmp
+ mv $JAVA_FILE.tmp $JAVA_FILE
+ cp $JAVA_FILE $DEST_DIR
+ done
+ rm -rf $JAVA_DIR
+ </echo>
+ <exec executable="sh" dir="${basedir}" failonerror="true">
+ <arg line="target/compile-thrift.sh"/>
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
index 327107a..549ea16 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
*/
package com.cloudera.flume.handlers.thrift;
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
index d2495d2..6879817 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
*/
package com.cloudera.flume.handlers.thrift;
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
index 2bb6cfd..6b79cc9 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
@@ -16,15 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
*/
package com.cloudera.flume.handlers.thrift;
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
@@ -40,7 +49,6 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Deprecated
public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEvent, ThriftFlumeEvent._Fields>, java.io.Serializable, Cloneable {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftFlumeEvent");
@@ -51,16 +59,22 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField FIELDS_FIELD_DESC = new org.apache.thrift.protocol.TField("fields", org.apache.thrift.protocol.TType.MAP, (short)6);
- public long timestamp;
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ThriftFlumeEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ThriftFlumeEventTupleSchemeFactory());
+ }
+
+ public long timestamp; // required
/**
*
* @see Priority
*/
- public Priority priority;
- public ByteBuffer body;
- public long nanos;
- public String host;
- public Map<String,ByteBuffer> fields;
+ public Priority priority; // required
+ public ByteBuffer body; // required
+ public long nanos; // required
+ public String host; // required
+ public Map<String,ByteBuffer> fields; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -142,8 +156,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
// isset id assignments
private static final int __TIMESTAMP_ISSET_ID = 0;
private static final int __NANOS_ISSET_ID = 1;
- private BitSet __isset_bit_vector = new BitSet(2);
-
+ private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -191,8 +204,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
* Performs a deep copy on <i>other</i>.
*/
public ThriftFlumeEvent(ThriftFlumeEvent other) {
- __isset_bit_vector.clear();
- __isset_bit_vector.or(other.__isset_bit_vector);
+ __isset_bitfield = other.__isset_bitfield;
this.timestamp = other.timestamp;
if (other.isSetPriority()) {
this.priority = other.priority;
@@ -250,16 +262,16 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
}
public void unsetTimestamp() {
- __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
}
/** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
public boolean isSetTimestamp() {
- return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+ return EncodingUtils.testBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
}
public void setTimestampIsSet(boolean value) {
- __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMESTAMP_ISSET_ID, value);
}
/**
@@ -339,16 +351,16 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
}
public void unsetNanos() {
- __isset_bit_vector.clear(__NANOS_ISSET_ID);
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __NANOS_ISSET_ID);
}
/** Returns true if field nanos is set (has been assigned a value) and false otherwise */
public boolean isSetNanos() {
- return __isset_bit_vector.get(__NANOS_ISSET_ID);
+ return EncodingUtils.testBit(__isset_bitfield, __NANOS_ISSET_ID);
}
public void setNanosIsSet(boolean value) {
- __isset_bit_vector.set(__NANOS_ISSET_ID, value);
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __NANOS_ISSET_ID, value);
}
public String getHost() {
@@ -466,7 +478,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
public Object getFieldValue(_Fields field) {
switch (field) {
case TIMESTAMP:
- return new Long(getTimestamp());
+ return Long.valueOf(getTimestamp());
case PRIORITY:
return getPriority();
@@ -475,7 +487,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
return getBody();
case NANOS:
- return new Long(getNanos());
+ return Long.valueOf(getNanos());
case HOST:
return getHost();
@@ -693,122 +705,11 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField field;
- iprot.readStructBegin();
- while (true)
- {
- field = iprot.readFieldBegin();
- if (field.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // TIMESTAMP
- if (field.type == org.apache.thrift.protocol.TType.I64) {
- this.timestamp = iprot.readI64();
- setTimestampIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 2: // PRIORITY
- if (field.type == org.apache.thrift.protocol.TType.I32) {
- this.priority = Priority.findByValue(iprot.readI32());
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 3: // BODY
- if (field.type == org.apache.thrift.protocol.TType.STRING) {
- this.body = iprot.readBinary();
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 4: // NANOS
- if (field.type == org.apache.thrift.protocol.TType.I64) {
- this.nanos = iprot.readI64();
- setNanosIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 5: // HOST
- if (field.type == org.apache.thrift.protocol.TType.STRING) {
- this.host = iprot.readString();
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 6: // FIELDS
- if (field.type == org.apache.thrift.protocol.TType.MAP) {
- {
- org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
- this.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
- for (int _i1 = 0; _i1 < _map0.size; ++_i1)
- {
- String _key2;
- ByteBuffer _val3;
- _key2 = iprot.readString();
- _val3 = iprot.readBinary();
- this.fields.put(_key2, _val3);
- }
- iprot.readMapEnd();
- }
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- validate();
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
- oprot.writeI64(this.timestamp);
- oprot.writeFieldEnd();
- if (this.priority != null) {
- oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
- oprot.writeI32(this.priority.getValue());
- oprot.writeFieldEnd();
- }
- if (this.body != null) {
- oprot.writeFieldBegin(BODY_FIELD_DESC);
- oprot.writeBinary(this.body);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldBegin(NANOS_FIELD_DESC);
- oprot.writeI64(this.nanos);
- oprot.writeFieldEnd();
- if (this.host != null) {
- oprot.writeFieldBegin(HOST_FIELD_DESC);
- oprot.writeString(this.host);
- oprot.writeFieldEnd();
- }
- if (this.fields != null) {
- oprot.writeFieldBegin(FIELDS_FIELD_DESC);
- {
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.fields.size()));
- for (Map.Entry<String, ByteBuffer> _iter4 : this.fields.entrySet())
- {
- oprot.writeString(_iter4.getKey());
- oprot.writeBinary(_iter4.getValue());
- }
- oprot.writeMapEnd();
- }
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
@@ -861,6 +762,7 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
public void validate() throws org.apache.thrift.TException {
// check for required fields
+ // check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -874,12 +776,245 @@ public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEven
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
// it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
+ private static class ThriftFlumeEventStandardSchemeFactory implements SchemeFactory {
+ public ThriftFlumeEventStandardScheme getScheme() {
+ return new ThriftFlumeEventStandardScheme();
+ }
+ }
+
+ private static class ThriftFlumeEventStandardScheme extends StandardScheme<ThriftFlumeEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TIMESTAMP
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.timestamp = iprot.readI64();
+ struct.setTimestampIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // PRIORITY
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.priority = Priority.findByValue(iprot.readI32());
+ struct.setPriorityIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // BODY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.body = iprot.readBinary();
+ struct.setBodyIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 4: // NANOS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.nanos = iprot.readI64();
+ struct.setNanosIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 5: // HOST
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.host = iprot.readString();
+ struct.setHostIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 6: // FIELDS
+ if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
+ struct.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
+ for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+ {
+ String _key2; // required
+ ByteBuffer _val3; // required
+ _key2 = iprot.readString();
+ _val3 = iprot.readBinary();
+ struct.fields.put(_key2, _val3);
+ }
+ iprot.readMapEnd();
+ }
+ struct.setFieldsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+ oprot.writeI64(struct.timestamp);
+ oprot.writeFieldEnd();
+ if (struct.priority != null) {
+ oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
+ oprot.writeI32(struct.priority.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.body != null) {
+ oprot.writeFieldBegin(BODY_FIELD_DESC);
+ oprot.writeBinary(struct.body);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(NANOS_FIELD_DESC);
+ oprot.writeI64(struct.nanos);
+ oprot.writeFieldEnd();
+ if (struct.host != null) {
+ oprot.writeFieldBegin(HOST_FIELD_DESC);
+ oprot.writeString(struct.host);
+ oprot.writeFieldEnd();
+ }
+ if (struct.fields != null) {
+ oprot.writeFieldBegin(FIELDS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.fields.size()));
+ for (Map.Entry<String, ByteBuffer> _iter4 : struct.fields.entrySet())
+ {
+ oprot.writeString(_iter4.getKey());
+ oprot.writeBinary(_iter4.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ThriftFlumeEventTupleSchemeFactory implements SchemeFactory {
+ public ThriftFlumeEventTupleScheme getScheme() {
+ return new ThriftFlumeEventTupleScheme();
+ }
+ }
+
+ private static class ThriftFlumeEventTupleScheme extends TupleScheme<ThriftFlumeEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetTimestamp()) {
+ optionals.set(0);
+ }
+ if (struct.isSetPriority()) {
+ optionals.set(1);
+ }
+ if (struct.isSetBody()) {
+ optionals.set(2);
+ }
+ if (struct.isSetNanos()) {
+ optionals.set(3);
+ }
+ if (struct.isSetHost()) {
+ optionals.set(4);
+ }
+ if (struct.isSetFields()) {
+ optionals.set(5);
+ }
+ oprot.writeBitSet(optionals, 6);
+ if (struct.isSetTimestamp()) {
+ oprot.writeI64(struct.timestamp);
+ }
+ if (struct.isSetPriority()) {
+ oprot.writeI32(struct.priority.getValue());
+ }
+ if (struct.isSetBody()) {
+ oprot.writeBinary(struct.body);
+ }
+ if (struct.isSetNanos()) {
+ oprot.writeI64(struct.nanos);
+ }
+ if (struct.isSetHost()) {
+ oprot.writeString(struct.host);
+ }
+ if (struct.isSetFields()) {
+ {
+ oprot.writeI32(struct.fields.size());
+ for (Map.Entry<String, ByteBuffer> _iter5 : struct.fields.entrySet())
+ {
+ oprot.writeString(_iter5.getKey());
+ oprot.writeBinary(_iter5.getValue());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ThriftFlumeEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(6);
+ if (incoming.get(0)) {
+ struct.timestamp = iprot.readI64();
+ struct.setTimestampIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.priority = Priority.findByValue(iprot.readI32());
+ struct.setPriorityIsSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.body = iprot.readBinary();
+ struct.setBodyIsSet(true);
+ }
+ if (incoming.get(3)) {
+ struct.nanos = iprot.readI64();
+ struct.setNanosIsSet(true);
+ }
+ if (incoming.get(4)) {
+ struct.host = iprot.readString();
+ struct.setHostIsSet(true);
+ }
+ if (incoming.get(5)) {
+ {
+ org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.fields = new HashMap<String,ByteBuffer>(2*_map6.size);
+ for (int _i7 = 0; _i7 < _map6.size; ++_i7)
+ {
+ String _key8; // required
+ ByteBuffer _val9; // required
+ _key8 = iprot.readString();
+ _val9 = iprot.readBinary();
+ struct.fields.put(_key8, _val9);
+ }
+ }
+ struct.setFieldsIsSet(true);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
index 0f2ad2d..208838a 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
@@ -16,15 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/**
- * Autogenerated by Thrift
+ * Autogenerated by Thrift Compiler (0.9.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
*/
package com.cloudera.flume.handlers.thrift;
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
@@ -40,7 +49,6 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Deprecated
public class ThriftFlumeEventServer {
public interface Iface {
@@ -59,7 +67,7 @@ public class ThriftFlumeEventServer {
}
- public static class Client implements org.apache.thrift.TServiceClient, Iface {
+ public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
public Factory() {}
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
@@ -72,28 +80,11 @@ public class ThriftFlumeEventServer {
public Client(org.apache.thrift.protocol.TProtocol prot)
{
- this(prot, prot);
- }
-
- public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot)
- {
- iprot_ = iprot;
- oprot_ = oprot;
- }
-
- protected org.apache.thrift.protocol.TProtocol iprot_;
- protected org.apache.thrift.protocol.TProtocol oprot_;
-
- protected int seqid_;
-
- public org.apache.thrift.protocol.TProtocol getInputProtocol()
- {
- return this.iprot_;
+ super(prot, prot);
}
- public org.apache.thrift.protocol.TProtocol getOutputProtocol()
- {
- return this.oprot_;
+ public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ super(iprot, oprot);
}
public void append(ThriftFlumeEvent evt) throws org.apache.thrift.TException
@@ -103,12 +94,9 @@ public class ThriftFlumeEventServer {
public void send_append(ThriftFlumeEvent evt) throws org.apache.thrift.TException
{
- oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("append", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
append_args args = new append_args();
args.setEvt(evt);
- args.write(oprot_);
- oprot_.writeMessageEnd();
- oprot_.getTransport().flush();
+ sendBase("append", args);
}
public void close() throws org.apache.thrift.TException
@@ -119,27 +107,14 @@ public class ThriftFlumeEventServer {
public void send_close() throws org.apache.thrift.TException
{
- oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
close_args args = new close_args();
- args.write(oprot_);
- oprot_.writeMessageEnd();
- oprot_.getTransport().flush();
+ sendBase("close", args);
}
public void recv_close() throws org.apache.thrift.TException
{
- org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
- if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
- org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
- iprot_.readMessageEnd();
- throw x;
- }
- if (msg.seqid != seqid_) {
- throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "close failed: out of sequence response");
- }
close_result result = new close_result();
- result.read(iprot_);
- iprot_.readMessageEnd();
+ receiveBase(result, "close");
return;
}
@@ -163,9 +138,9 @@ public class ThriftFlumeEventServer {
public void append(ThriftFlumeEvent evt, org.apache.thrift.async.AsyncMethodCallback<append_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
- append_call method_call = new append_call(evt, resultHandler, this, protocolFactory, transport);
- this.currentMethod = method_call;
- manager.call(method_call);
+ append_call method_call = new append_call(evt, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
}
public static class append_call extends org.apache.thrift.async.TAsyncMethodCall {
@@ -194,9 +169,9 @@ public class ThriftFlumeEventServer {
public void close(org.apache.thrift.async.AsyncMethodCallback<close_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
- close_call method_call = new close_call(resultHandler, this, protocolFactory, transport);
- this.currentMethod = method_call;
- manager.call(method_call);
+ close_call method_call = new close_call(resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
}
public static class close_call extends org.apache.thrift.async.TAsyncMethodCall {
@@ -223,85 +198,59 @@ public class ThriftFlumeEventServer {
}
- public static class Processor implements org.apache.thrift.TProcessor {
+ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
- public Processor(Iface iface)
- {
- iface_ = iface;
- processMap_.put("append", new append());
- processMap_.put("close", new close());
+ public Processor(I iface) {
+ super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
- protected static interface ProcessFunction {
- public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
+ protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ super(iface, getProcessMap(processMap));
}
- private Iface iface_;
- protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
+ private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ processMap.put("append", new append());
+ processMap.put("close", new close());
+ return processMap;
+ }
- public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
- {
- org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
- ProcessFunction fn = processMap_.get(msg.name);
- if (fn == null) {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
- iprot.readMessageEnd();
- org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
- oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
- x.write(oprot);
- oprot.writeMessageEnd();
- oprot.getTransport().flush();
+ public static class append<I extends Iface> extends org.apache.thrift.ProcessFunction<I, append_args> {
+ public append() {
+ super("append");
+ }
+
+ public append_args getEmptyArgsInstance() {
+ return new append_args();
+ }
+
+ protected boolean isOneway() {
return true;
}
- fn.process(msg.seqid, iprot, oprot);
- return true;
- }
- private class append implements ProcessFunction {
- public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
- {
- append_args args = new append_args();
- try {
- args.read(iprot);
- } catch (org.apache.thrift.protocol.TProtocolException e) {
- iprot.readMessageEnd();
- org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
- oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("append", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
- x.write(oprot);
- oprot.writeMessageEnd();
- oprot.getTransport().flush();
- return;
- }
- iprot.readMessageEnd();
- iface_.append(args.evt);
- return;
+ public org.apache.thrift.TBase getResult(I iface, append_args args) throws org.apache.thrift.TException {
+ iface.append(args.evt);
+ return null;
}
}
- private class close implements ProcessFunction {
- public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
- {
- close_args args = new close_args();
- try {
- args.read(iprot);
- } catch (org.apache.thrift.protocol.TProtocolException e) {
- iprot.readMessageEnd();
- org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
- oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
- x.write(oprot);
- oprot.writeMessageEnd();
- oprot.getTransport().flush();
- return;
- }
- iprot.readMessageEnd();
- close_result result = new close_result();
- iface_.close();
- oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("close", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
- result.write(oprot);
- oprot.writeMessageEnd();
- oprot.getTransport().flush();
+ public static class close<I extends Iface> extends org.apache.thrift.ProcessFunction<I, close_args> {
+ public close() {
+ super("close");
}
+ public close_args getEmptyArgsInstance() {
+ return new close_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public close_result getResult(I iface, close_args args) throws org.apache.thrift.TException {
+ close_result result = new close_result();
+ iface.close();
+ return result;
+ }
}
}
@@ -311,7 +260,13 @@ public class ThriftFlumeEventServer {
private static final org.apache.thrift.protocol.TField EVT_FIELD_DESC = new org.apache.thrift.protocol.TField("evt", org.apache.thrift.protocol.TType.STRUCT, (short)1);
- public ThriftFlumeEvent evt;
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new append_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new append_argsTupleSchemeFactory());
+ }
+
+ public ThriftFlumeEvent evt; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -372,7 +327,6 @@ public class ThriftFlumeEventServer {
}
// isset id assignments
-
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -532,45 +486,11 @@ public class ThriftFlumeEventServer {
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField field;
- iprot.readStructBegin();
- while (true)
- {
- field = iprot.readFieldBegin();
- if (field.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // EVT
- if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
- this.evt = new ThriftFlumeEvent();
- this.evt.read(iprot);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- validate();
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (this.evt != null) {
- oprot.writeFieldBegin(EVT_FIELD_DESC);
- this.evt.write(oprot);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
@@ -591,6 +511,10 @@ public class ThriftFlumeEventServer {
public void validate() throws org.apache.thrift.TException {
// check for required fields
+ // check for sub-struct validity
+ if (evt != null) {
+ evt.validate();
+ }
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -609,12 +533,104 @@ public class ThriftFlumeEventServer {
}
}
+ private static class append_argsStandardSchemeFactory implements SchemeFactory {
+ public append_argsStandardScheme getScheme() {
+ return new append_argsStandardScheme();
+ }
+ }
+
+ private static class append_argsStandardScheme extends StandardScheme<append_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, append_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EVT
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.evt = new ThriftFlumeEvent();
+ struct.evt.read(iprot);
+ struct.setEvtIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, append_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.evt != null) {
+ oprot.writeFieldBegin(EVT_FIELD_DESC);
+ struct.evt.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class append_argsTupleSchemeFactory implements SchemeFactory {
+ public append_argsTupleScheme getScheme() {
+ return new append_argsTupleScheme();
+ }
+ }
+
+ private static class append_argsTupleScheme extends TupleScheme<append_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, append_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetEvt()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetEvt()) {
+ struct.evt.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, append_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.evt = new ThriftFlumeEvent();
+ struct.evt.read(iprot);
+ struct.setEvtIsSet(true);
+ }
+ }
+ }
+
}
public static class close_args implements org.apache.thrift.TBase<close_args, close_args._Fields>, java.io.Serializable, Cloneable {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("close_args");
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new close_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new close_argsTupleSchemeFactory());
+ }
+
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -756,32 +772,11 @@ public class ThriftFlumeEventServer {
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField field;
- iprot.readStructBegin();
- while (true)
- {
- field = iprot.readFieldBegin();
- if (field.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (field.id) {
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- validate();
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
@@ -795,6 +790,7 @@ public class ThriftFlumeEventServer {
public void validate() throws org.apache.thrift.TException {
// check for required fields
+ // check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -813,12 +809,76 @@ public class ThriftFlumeEventServer {
}
}
+ private static class close_argsStandardSchemeFactory implements SchemeFactory {
+ public close_argsStandardScheme getScheme() {
+ return new close_argsStandardScheme();
+ }
+ }
+
+ private static class close_argsStandardScheme extends StandardScheme<close_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, close_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, close_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class close_argsTupleSchemeFactory implements SchemeFactory {
+ public close_argsTupleScheme getScheme() {
+ return new close_argsTupleScheme();
+ }
+ }
+
+ private static class close_argsTupleScheme extends TupleScheme<close_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, close_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, close_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
}
public static class close_result implements org.apache.thrift.TBase<close_result, close_result._Fields>, java.io.Serializable, Cloneable {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("close_result");
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new close_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new close_resultTupleSchemeFactory());
+ }
+
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -960,32 +1020,12 @@ public class ThriftFlumeEventServer {
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField field;
- iprot.readStructBegin();
- while (true)
- {
- field = iprot.readFieldBegin();
- if (field.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (field.id) {
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- validate();
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- oprot.writeStructBegin(STRUCT_DESC);
-
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
@Override
public String toString() {
@@ -998,6 +1038,7 @@ public class ThriftFlumeEventServer {
public void validate() throws org.apache.thrift.TException {
// check for required fields
+ // check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -1016,6 +1057,64 @@ public class ThriftFlumeEventServer {
}
}
+ private static class close_resultStandardSchemeFactory implements SchemeFactory {
+ public close_resultStandardScheme getScheme() {
+ return new close_resultStandardScheme();
+ }
+ }
+
+ private static class close_resultStandardScheme extends StandardScheme<close_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, close_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, close_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class close_resultTupleSchemeFactory implements SchemeFactory {
+ public close_resultTupleScheme getScheme() {
+ return new close_resultTupleScheme();
+ }
+ }
+
+ private static class close_resultTupleScheme extends TupleScheme<close_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, close_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, close_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2 b/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
new file mode 100644
index 0000000..7243427
--- /dev/null
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml
index ab066d5..3709339 100644
--- a/flume-ng-sdk/pom.xml
+++ b/flume-ng-sdk/pom.xml
@@ -28,6 +28,72 @@ limitations under the License.
<name>Flume NG SDK</name>
<description>Flume Software Development Kit: Stable public API for integration with Flume 1.x</description>
+ <profiles>
+ <profile>
+ <id>compileThrift</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compileThrift</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <echo file="target/compile-thrift.sh">
+ LICENSE=src/main/thrift/aslv2
+ THRIFT_DIR=src/main/thrift
+ JAVA_DIR=target/generated-sources/thrift
+ mkdir -p $JAVA_DIR 2> /dev/null
+ JSTATUS=$?
+ if [ $JSTATUS -ne 0 ] ; then
+ echo "Could not create $JAVA_DIR. Will not generate thrift files."
+ exit $JSTATUS
+ fi
+ for THRIFT_FILE in `ls $THRIFT_DIR/*.thrift 2> /dev/null`
+ do
+ thrift --gen java:hashcode -o $JAVA_DIR $THRIFT_FILE
+ done
+ SRC_DIR=$JAVA_DIR/gen-java/org/apache/flume/thrift/
+ DEST_DIR=src/main/java/org/apache/flume/thrift
+ if [ ! -d $DEST_DIR ] ; then
+ mkdir $DEST_DIR 2> /dev/null
+ STATUS=$?
+ if [ $STATUS -ne 0 ] ; then
+ echo "Could not create $DEST_DIR. Will not generate thrift files."
+ exit $STATUS
+ fi
+ fi
+ for JAVA_FILE in `ls $SRC_DIR/*.java 2> /dev/null`
+ do
+ echo $JAVA_FILE
+ cat $LICENSE > $JAVA_FILE.tmp
+ cat $JAVA_FILE >> $JAVA_FILE.tmp
+ mv $JAVA_FILE.tmp $JAVA_FILE
+ cp $JAVA_FILE $DEST_DIR
+ done
+ rm -rf $JAVA_DIR
+ </echo>
+ <exec executable="sh" dir="${basedir}" failonerror="true">
+ <arg line="target/compile-thrift.sh"/>
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
<build>
<plugins>
@@ -64,6 +130,18 @@ limitations under the License.
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
@@ -105,5 +183,10 @@ limitations under the License.
<artifactId>netty</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
index 7388a45..8a81208 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
@@ -63,7 +63,7 @@ public class HostInfo {
return referenceName + "{" + hostName + ":" + portNumber + "}";
}
- static List<HostInfo> getHostInfoList(Properties properties) {
+ public static List<HostInfo> getHostInfoList(Properties properties) {
List<HostInfo> hosts = new ArrayList<HostInfo>();
String hostNames = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_HOSTS);
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index ab4c3de..1e642d8 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -119,6 +119,13 @@ public final class RpcClientConfigurationConstants {
public static final String CONFIG_BACKOFF = "backoff";
public static final String DEFAULT_BACKOFF = "false";
+ /**
+ * Maximum number of connections each Thrift Rpc client can open to a given
+ * host.
+ */
+ public static final String CONFIG_CONNECTION_POOL_SIZE = "maxConnections";
+ public static final int DEFAULT_CONNECTION_POOL_SIZE = 5;
+
private RpcClientConfigurationConstants() {
// disable explicit object creation
}
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
index 3c93921..00d9cf0 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
@@ -170,11 +170,72 @@ public class RpcClientFactory {
return client;
}
+ /**
+ * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+ * the next hop. The next hop must have a ThriftSource listening on the
+ * specified port.
+ * @param hostname - The hostname of the next hop.
+ * @param port - The port on which the ThriftSource is listening
+ * @param batchSize - batch size of each transaction.
+ * @return an {@linkplain RpcClient} which uses thrift configured with the
+ * given parameters.
+ */
+ public static RpcClient getThriftInstance(String hostname, Integer port,
+ Integer batchSize) {
+ if (hostname == null) {
+ throw new NullPointerException("hostname must not be null");
+ }
+ if (port == null) {
+ throw new NullPointerException("port must not be null");
+ }
+ if (batchSize == null) {
+ throw new NullPointerException("batchSize must not be null");
+ }
+
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+ hostname + ":" + port.intValue());
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
+ ThriftRpcClient client = new ThriftRpcClient();
+ client.configure(props);
+ return client;
+ }
+
+ /**
+ * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+ * the next hop. The next hop must have a ThriftSource listening on the
+ * specified port. This will use the default batch size. See {@linkplain
+ * RpcClientConfigurationConstants}
+ * @param hostname - The hostname of the next hop.
+ * @param port - The port on which the ThriftSource is listening
+ * @return - An {@linkplain RpcClient} which uses thrift configured with the
+ * given parameters.
+ */
+ public static RpcClient getThriftInstance(String hostname, Integer port) {
+ return getThriftInstance(hostname, port, RpcClientConfigurationConstants
+ .DEFAULT_BATCH_SIZE);
+ }
+
+ /**
+ * Return an {@linkplain RpcClient} that uses Thrift for communicating with
+ * the next hop.
+ * @param props
+ * @return - An {@linkplain RpcClient} which uses thrift configured with the
+ * given parameters.
+ */
+ public static RpcClient getThriftInstance(Properties props) {
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+ ClientType.THRIFT.clientClassName);
+ return getInstance(props);
+ }
+
public static enum ClientType {
OTHER(null),
DEFAULT(NettyAvroRpcClient.class.getCanonicalName()),
DEFAULT_FAILOVER(FailoverRpcClient.class.getCanonicalName()),
- DEFAULT_LOADBALANCE(LoadBalancingRpcClient.class.getCanonicalName());
+ DEFAULT_LOADBALANCE(LoadBalancingRpcClient.class.getCanonicalName()),
+ THRIFT(ThriftRpcClient.class.getCanonicalName());
private final String clientClassName;
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
new file mode 100644
index 0000000..cf45ab9
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.thrift.Status;
+import org.apache.flume.thrift.ThriftFlumeEvent;
+import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ThriftRpcClient extends AbstractRpcClient {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ThriftRpcClient.class);
+
+ private int batchSize;
+ private long requestTimeout;
+ private final Lock stateLock;
+ private State connState;
+ private String hostname;
+ private int port;
+ private ConnectionPoolManager connectionManager;
+ private final ExecutorService callTimeoutPool;
+ private final AtomicLong threadCounter;
+ private int connectionPoolSize;
+ private final Random random = new Random();
+
+ public ThriftRpcClient() {
+ stateLock = new ReentrantLock(true);
+ connState = State.INIT;
+
+ threadCounter = new AtomicLong(0);
+ // OK to use cached threadpool, because this is simply meant to timeout
+ // the calls - and is IO bound.
+ callTimeoutPool = Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("Flume Thrift RPC thread - " + String.valueOf(
+ threadCounter.incrementAndGet()));
+ return t;
+ }
+ });
+ }
+
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void append(Event event) throws EventDeliveryException {
+ // Thrift IPC client is not thread safe, so don't allow state changes or
+ // client.append* calls unless the lock is acquired.
+ ClientWrapper client = null;
+ boolean destroyedClient = false;
+ try {
+ if (!isActive()) {
+ throw new EventDeliveryException("Client was closed due to error. " +
+ "Please create a new client");
+ }
+ client = connectionManager.checkout();
+ final ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(event
+ .getHeaders(), ByteBuffer.wrap(event.getBody()));
+ doAppend(client, thriftEvent).get(requestTimeout, TimeUnit.MILLISECONDS);
+ } catch (Throwable e) {
+ if (e instanceof ExecutionException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof EventDeliveryException) {
+ throw (EventDeliveryException) cause;
+ } else if (cause instanceof TimeoutException) {
+ throw new EventDeliveryException("Append call timeout", cause);
+ }
+ }
+ destroyedClient = true;
+ // If destroy throws, we still don't want to reuse the client, so mark it
+ // as destroyed before we actually do.
+ if (client != null) {
+ connectionManager.destroy(client);
+ }
+ if (e instanceof Error) {
+ throw (Error) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new EventDeliveryException("Failed to send event. ", e);
+ } finally {
+ if (client != null && !destroyedClient) {
+ connectionManager.checkIn(client);
+ }
+ }
+ }
+
+ @Override
+ public void appendBatch(List<Event> events) throws EventDeliveryException {
+ // Thrift IPC client is not thread safe, so don't allow state changes or
+ // client.append* calls unless the lock is acquired.
+ ClientWrapper client = null;
+ boolean destroyedClient = false;
+ try {
+ if (!isActive()) {
+ throw new EventDeliveryException("Client was closed " +
+ "due to error or is not yet configured.");
+ }
+ client = connectionManager.checkout();
+ final List<ThriftFlumeEvent> thriftFlumeEvents = new ArrayList
+ <ThriftFlumeEvent>();
+ Iterator<Event> eventsIter = events.iterator();
+ while (eventsIter.hasNext()) {
+ thriftFlumeEvents.clear();
+ for (int i = 0; i < batchSize && eventsIter.hasNext(); i++) {
+ Event event = eventsIter.next();
+ thriftFlumeEvents.add(new ThriftFlumeEvent(event.getHeaders(),
+ ByteBuffer.wrap(event.getBody())));
+ }
+ if (!thriftFlumeEvents.isEmpty()) {
+ doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ } catch (Throwable e) {
+ if (e instanceof ExecutionException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof EventDeliveryException) {
+ throw (EventDeliveryException) cause;
+ } else if (cause instanceof TimeoutException) {
+ throw new EventDeliveryException("Append call timeout", cause);
+ }
+ }
+ destroyedClient = true;
+ // If destroy throws, we still don't want to reuse the client, so mark it
+ // as destroyed before we actually do.
+ if (client != null) {
+ connectionManager.destroy(client);
+ }
+ if (e instanceof Error) {
+ throw (Error) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new EventDeliveryException("Failed to send event. ", e);
+ } finally {
+ if (client != null && !destroyedClient) {
+ connectionManager.checkIn(client);
+ }
+ }
+ }
+
+ private Future<Void> doAppend(final ClientWrapper client,
+ final ThriftFlumeEvent e) throws Exception {
+
+ return callTimeoutPool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Status status = client.client.append(e);
+ if (status != Status.OK) {
+ throw new EventDeliveryException("Failed to deliver events. Server " +
+ "returned status : " + status.name());
+ }
+ return null;
+ }
+ });
+ }
+
+ private Future<Void> doAppendBatch(final ClientWrapper client,
+ final List<ThriftFlumeEvent> e) throws Exception {
+
+ return callTimeoutPool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Status status = client.client.appendBatch(e);
+ if (status != Status.OK) {
+ throw new EventDeliveryException("Failed to deliver events. Server " +
+ "returned status : " + status.name());
+ }
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public boolean isActive() {
+ stateLock.lock();
+ try {
+ return (connState == State.READY);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws FlumeException {
+ try {
+ //Do not release this, because this client is not to be used again
+ stateLock.lock();
+ connState = State.DEAD;
+ connectionManager.closeAll();
+ callTimeoutPool.shutdown();
+ if(!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ callTimeoutPool.shutdownNow();
+ }
+ } catch (Throwable ex) {
+ if(ex instanceof Error) {
+ throw (Error) ex;
+ } else if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ }
+ throw new FlumeException("Failed to close RPC client. ", ex);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ @Override
+ protected void configure(Properties properties) throws FlumeException {
+ if (isActive()) {
+ throw new FlumeException("Attempting to re-configured an already " +
+ "configured client!");
+ }
+ stateLock.lock();
+ try {
+ HostInfo host = HostInfo.getHostInfoList(properties).get(0);
+ hostname = host.getHostName();
+ port = host.getPortNumber();
+ batchSize = Integer.parseInt(properties.getProperty(
+ RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+ RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
+ requestTimeout = Long.parseLong(properties.getProperty(
+ RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+ String.valueOf(
+ RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
+ if (requestTimeout < 1000) {
+ LOGGER.warn("Request timeout specified less than 1s. " +
+ "Using default value instead.");
+ requestTimeout =
+ RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
+ }
+ connectionPoolSize = Integer.parseInt(properties.getProperty(
+ RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
+ String.valueOf(RpcClientConfigurationConstants
+ .DEFAULT_CONNECTION_POOL_SIZE)));
+ if(connectionPoolSize < 1) {
+ LOGGER.warn("Connection Pool Size specified is less than 1. " +
+ "Using default value instead.");
+ connectionPoolSize = RpcClientConfigurationConstants
+ .DEFAULT_CONNECTION_POOL_SIZE;
+ }
+ connectionManager = new ConnectionPoolManager(connectionPoolSize);
+ connState = State.READY;
+ } catch (Throwable ex) {
+ //Failed to configure, kill the client.
+ connState = State.DEAD;
+ if(ex instanceof Error) {
+ throw (Error) ex;
+ } else if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ }
+ throw new FlumeException("Error while configuring RpcClient. ", ex);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ private static enum State {
+ INIT, READY, DEAD
+ }
+
+ /**
+ * Wrapper around a client and transport, so we can clean up when this
+ * client gets closed.
+ */
+ private class ClientWrapper {
+ public final ThriftSourceProtocol.Client client;
+ public final TFastFramedTransport transport;
+ private final int hashCode;
+
+ public ClientWrapper() throws Exception{
+ transport = new TFastFramedTransport(new TSocket(hostname, port));
+ transport.open();
+ client = new ThriftSourceProtocol.Client(new TCompactProtocol
+ (transport));
+ // Not a great hash code, but since this class is immutable and there
+ // is at most one instance of the components of this class,
+ // this works fine [If the objects are equal, hash code is the same]
+ hashCode = random.nextInt();
+ }
+
+ public boolean equals(Object o) {
+ if(o == null) {
+ return false;
+ }
+ // Since there is only one wrapper with any given client,
+ // direct comparison is good enough.
+ if(this == o) {
+ return true;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+
+ private class ConnectionPoolManager {
+ private final Queue<ClientWrapper> availableClients;
+ private final Set<ClientWrapper> checkedOutClients;
+ private final int maxPoolSize;
+ private int currentPoolSize;
+ private final Lock poolLock;
+ private final Condition availableClientsCondition;
+
+ public ConnectionPoolManager(int poolSize) {
+ this.maxPoolSize = poolSize;
+ availableClients = new LinkedList<ClientWrapper>();
+ checkedOutClients = new HashSet<ClientWrapper>();
+ poolLock = new ReentrantLock();
+ availableClientsCondition = poolLock.newCondition();
+ currentPoolSize = 0;
+ }
+
+ public ClientWrapper checkout() throws Exception {
+
+ ClientWrapper ret = null;
+ poolLock.lock();
+ try {
+ if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
+ ret = new ClientWrapper();
+ currentPoolSize++;
+ checkedOutClients.add(ret);
+ return ret;
+ }
+ while (availableClients.isEmpty()) {
+ availableClientsCondition.await();
+ }
+ ret = availableClients.poll();
+ checkedOutClients.add(ret);
+ } finally {
+ poolLock.unlock();
+ }
+ return ret;
+ }
+
+ public void checkIn(ClientWrapper client) {
+ poolLock.lock();
+ try {
+ availableClients.add(client);
+ checkedOutClients.remove(client);
+ availableClientsCondition.signal();
+ } finally {
+ poolLock.unlock();
+ }
+ }
+
+ public void destroy(ClientWrapper client) {
+ poolLock.lock();
+ try {
+ checkedOutClients.remove(client);
+ currentPoolSize--;
+ } finally {
+ poolLock.unlock();
+ }
+ client.transport.close();
+ }
+
+ public void closeAll() {
+ poolLock.lock();
+ try {
+ for (ClientWrapper c : availableClients) {
+ c.transport.close();
+ currentPoolSize--;
+ }
+ /*
+ * Be cruel and close even the checked out clients. The threads writing
+ * using these will now get an exception.
+ */
+ for (ClientWrapper c : checkedOutClients) {
+ c.transport.close();
+ currentPoolSize--;
+ }
+ } finally {
+ poolLock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/60da3d86/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java b/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
new file mode 100644
index 0000000..c05f4e3
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.flume.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum Status implements org.apache.thrift.TEnum {
+ OK(0),
+ FAILED(1),
+ ERROR(2),
+ UNKNOWN(3);
+
+ private final int value;
+
+ private Status(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static Status findByValue(int value) {
+ switch (value) {
+ case 0:
+ return OK;
+ case 1:
+ return FAILED;
+ case 2:
+ return ERROR;
+ case 3:
+ return UNKNOWN;
+ default:
+ return null;
+ }
+ }
+}