You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/02/06 20:48:01 UTC

svn commit: r1241129 [1/2] - in /incubator/flume/branches/flume-728/flume-ng-legacy-sources: ./ flume-avro-source/ flume-avro-source/src/ flume-avro-source/src/main/ flume-avro-source/src/main/avro/ flume-avro-source/src/main/java/ flume-avro-source/sr...

Author: prasadm
Date: Mon Feb  6 19:47:59 2012
New Revision: 1241129

URL: http://svn.apache.org/viewvc?rev=1241129&view=rev
Log:
FLUME-942: Support event compatibility with Flume 0.9x

Added:
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/flumeCompatibility.thrift
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/pom.xml

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml Mon Feb  6 19:47:59 2012
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-legacy-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.1.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
+  <artifactId>flume-avro-source</artifactId>
+  <name>Flume legacy Avro source</name>
+
+  <properties>
+    <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+   </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+   </dependency>
+
+  </dependencies>
+
+</project>

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl Mon Feb  6 19:47:59 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Avro protocol for Flume 0.9.x compatibility */
+@namespace("com.cloudera.flume.handlers.avro")
+protocol FlumeOGEventAvroServer{
+  enum Priority { FATAL, ERROR, WARN,
+    INFO, DEBUG, TRACE}
+
+  record AvroFlumeOGEvent {
+    long timestamp;
+    Priority priority;
+    bytes body;
+    long nanos;
+    string host;
+    map<bytes> fields;
+  }
+
+  void append( AvroFlumeOGEvent evt ) ;
+}
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Mon Feb  6 19:47:59 2012
@@ -0,0 +1,137 @@
+package org.apache.flume.source.avroLegacy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Source;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
+import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flume.ChannelException;
+
+/**
+ * <p>
+ * A {@link Source} implementation that receives Avro events from Avro sink of Flume OG
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>host</tt></td>
+ * <td>The hostname or IP to which the source will bind.</td>
+ * <td>Hostname or IP / String</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which the source will bind and listen for events.</td>
+ * <td>TCP port / int</td>
+ * <td>none (required)</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
+
+public class AvroLegacySource extends AbstractSource implements EventDrivenSource,
+    Configurable, FlumeOGEventAvroServer {
+
+  static final Logger LOG = LoggerFactory.getLogger(AvroLegacySource.class);
+
+  //  Flume OG event fields
+  public static final String HOST = "host";
+  public static final String TIMESTAMP = "timestamp";
+  public static final String PRIORITY = "pri";
+  public static final String NANOS = "nanos";
+  public static final String OG_EVENT = "FlumeOG";
+
+  private CounterGroup counterGroup;
+  protected FlumeOGEventAvroServer avroClient;
+  private String host;
+  private int port;
+  private SpecificResponder res;
+  private HttpServer http;
+
+  public AvroLegacySource() {
+    counterGroup = new CounterGroup();
+  }
+
+  @Override
+  public void start() {
+    // setup http server to receive OG events
+    res = new SpecificResponder(FlumeOGEventAvroServer.class, this);
+    try {
+      http = new HttpServer(res, host, port);
+    } catch (IOException eI) {
+      LOG.warn("Failed to start server", eI);
+      return;
+    }
+    http.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    http.close();
+    super.stop();
+  }
+
+  @Override
+  public Void append( AvroFlumeOGEvent evt ) throws org.apache.avro.AvroRemoteException {
+    counterGroup.incrementAndGet("rpc.received");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // extract Flume OG event headers
+    headers.put(HOST, evt.getHost().toString());
+    headers.put(TIMESTAMP, evt.getTimestamp().toString());
+    headers.put(PRIORITY, evt.getPriority().toString());
+    headers.put(NANOS, evt.getNanos().toString());
+    for (Entry<CharSequence, ByteBuffer> entry: evt.getFields().entrySet()) {
+      headers.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    headers.put(OG_EVENT, "yes");
+
+    Event event = EventBuilder.withBody(evt.getBody().array(), headers);
+    try {
+      getChannelProcessor().processEvent(event);
+      counterGroup.incrementAndGet("rpc.events");
+    } catch (ChannelException ex) {
+      return null;
+    }
+
+    counterGroup.incrementAndGet("rpc.successful");
+    return null;
+  }
+
+  @Override
+  public void configure(Context context) {
+    port = Integer.parseInt(context.get("port", String.class));
+    host = context.get("host", String.class);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java Mon Feb  6 19:47:59 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.avroLegacy;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
+import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import com.cloudera.flume.handlers.avro.Priority;
+import org.jboss.netty.channel.ChannelException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestLegacyAvroSource {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestLegacyAvroSource.class);
+
+  private int selectedPort;
+  private AvroLegacySource source;
+  private Channel channel;
+
+  @Before
+  public void setUp() {
+    source = new AvroLegacySource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException {
+    boolean bound = false;
+
+    for (int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
+        context.put("host", "0.0.0.0");
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        // Assume port in use, try another one
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+  @Test
+  public void testRequest() throws InterruptedException, IOException {
+    boolean bound = false;
+    int i;
+
+    for (i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
+        context.put("host", "0.0.0.0");
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        // Assume port in use, try another one
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    // setup a requester, to send a flume OG event
+    URL url = new URL("http", "0.0.0.0", selectedPort, "/");
+    Transceiver http = new HttpTransceiver(url);
+    FlumeOGEventAvroServer client = SpecificRequestor.getClient(
+        FlumeOGEventAvroServer.class, http);
+
+    AvroFlumeOGEvent avroEvent =  AvroFlumeOGEvent.newBuilder().setHost("foo").
+        setPriority(Priority.INFO).setNanos(0).setTimestamp(1).
+        setFields(new HashMap<CharSequence, ByteBuffer> ()).
+        setBody(ByteBuffer.wrap("foo".getBytes())).build();
+
+    client.append(avroEvent);
+
+    // check if the even has arrived in the channel through OG avro source
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("Channel contained our event", "foo",
+        new String(event.getBody()));
+    transaction.commit();
+    transaction.close();
+
+    source.stop();
+
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml Mon Feb  6 19:47:59 2012
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-legacy-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.1.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
+  <artifactId>flume-thrift-source</artifactId>
+  <name>Flume legacy Thrift Source</name>
+
+  <profiles>
+    <profile>
+      <id>compileThrift</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
+      </properties>
+      <build>
+       <plugins>
+         <plugin>
+           <groupId>org.apache.thrift.tools</groupId>
+           <artifactId>maven-thrift-plugin</artifactId>
+           <version>0.1.10</version>
+           <configuration>
+             <thriftExecutable>${thrift.executable}</thriftExecutable>
+           </configuration>
+           <executions>
+             <execution>
+               <id>thrift-sources</id>
+               <phase>generate-sources</phase>
+               <goals>
+                 <goal>compile</goal>
+               </goals>
+             </execution>
+           </executions>
+         </plugin>
+         <plugin>
+           <groupId>org.apache.maven.plugins</groupId>
+           <artifactId>maven-compiler-plugin</artifactId>
+           <version>2.3.2</version>
+           <configuration>
+             <source>1.6</source>
+             <target>1.6</target>
+             <excludes>
+               <exclude>**/com/**</exclude>
+             </excludes>
+           </configuration>
+         </plugin>
+       </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>nonThrift</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-compiler-plugin</artifactId>
+              <version>2.3.2</version>
+              <configuration>
+                <source>1.6</source>
+                <target>1.6</target>
+                <excludes>
+                  <exclude>**/generated-sources/**</exclude>
+                </excludes>
+              </configuration>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
+  </profiles>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+   </dependency>
+
+  </dependencies>
+
+</project>

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README Mon Feb  6 19:47:59 2012
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+The com directory contains generated java source from thrift. This 
+is to support backward compatibility with 0.9.x flume. The 1.0 version 
+of Flume doesn't use thrift so in order to avoid depenancy on thrift
+compiler, the generated sources are checked into the source tree.

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java Mon Feb  6 19:47:59 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum EventStatus implements org.apache.thrift.TEnum {
+  ACK(0),
+  COMMITED(1),
+  ERR(2);
+
+  private final int value;
+
+  private EventStatus(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static EventStatus findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return ACK;
+      case 1:
+        return COMMITED;
+      case 2:
+        return ERR;
+      default:
+        return null;
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java Mon Feb  6 19:47:59 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum Priority implements org.apache.thrift.TEnum {
+  FATAL(0),
+  ERROR(1),
+  WARN(2),
+  INFO(3),
+  DEBUG(4),
+  TRACE(5);
+
+  private final int value;
+
+  private Priority(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static Priority findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return FATAL;
+      case 1:
+        return ERROR;
+      case 2:
+        return WARN;
+      case 3:
+        return INFO;
+      case 4:
+        return DEBUG;
+      case 5:
+        return TRACE;
+      default:
+        return null;
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java Mon Feb  6 19:47:59 2012
@@ -0,0 +1,884 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEvent, ThriftFlumeEvent._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftFlumeEvent");
+
+  private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)1);
+  private static final org.apache.thrift.protocol.TField PRIORITY_FIELD_DESC = new org.apache.thrift.protocol.TField("priority", org.apache.thrift.protocol.TType.I32, (short)2);
+  private static final org.apache.thrift.protocol.TField BODY_FIELD_DESC = new org.apache.thrift.protocol.TField("body", org.apache.thrift.protocol.TType.STRING, (short)3);
+  private static final org.apache.thrift.protocol.TField NANOS_FIELD_DESC = new org.apache.thrift.protocol.TField("nanos", org.apache.thrift.protocol.TType.I64, (short)4);
+  private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)5);
+  private static final org.apache.thrift.protocol.TField FIELDS_FIELD_DESC = new org.apache.thrift.protocol.TField("fields", org.apache.thrift.protocol.TType.MAP, (short)6);
+
+  public long timestamp;
+  /**
+   * 
+   * @see Priority
+   */
+  public Priority priority;
+  public ByteBuffer body;
+  public long nanos;
+  public String host;
+  public Map<String,ByteBuffer> fields;
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    TIMESTAMP((short)1, "timestamp"),
+    /**
+     * 
+     * @see Priority
+     */
+    PRIORITY((short)2, "priority"),
+    BODY((short)3, "body"),
+    NANOS((short)4, "nanos"),
+    HOST((short)5, "host"),
+    FIELDS((short)6, "fields");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // TIMESTAMP
+          return TIMESTAMP;
+        case 2: // PRIORITY
+          return PRIORITY;
+        case 3: // BODY
+          return BODY;
+        case 4: // NANOS
+          return NANOS;
+        case 5: // HOST
+          return HOST;
+        case 6: // FIELDS
+          return FIELDS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __TIMESTAMP_ISSET_ID = 0;
+  private static final int __NANOS_ISSET_ID = 1;
+  private BitSet __isset_bit_vector = new BitSet(2);
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64        , "Timestamp")));
+    tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, Priority.class)));
+    tmpMap.put(_Fields.BODY, new org.apache.thrift.meta_data.FieldMetaData("body", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING        , true)));
+    tmpMap.put(_Fields.NANOS, new org.apache.thrift.meta_data.FieldMetaData("nanos", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.FIELDS, new org.apache.thrift.meta_data.FieldMetaData("fields", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING            , true))));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ThriftFlumeEvent.class, metaDataMap);
+  }
+
+  public ThriftFlumeEvent() {
+  }
+
+  public ThriftFlumeEvent(
+    long timestamp,
+    Priority priority,
+    ByteBuffer body,
+    long nanos,
+    String host,
+    Map<String,ByteBuffer> fields)
+  {
+    this();
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+    this.priority = priority;
+    this.body = body;
+    this.nanos = nanos;
+    setNanosIsSet(true);
+    this.host = host;
+    this.fields = fields;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ThriftFlumeEvent(ThriftFlumeEvent other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    this.timestamp = other.timestamp;
+    if (other.isSetPriority()) {
+      this.priority = other.priority;
+    }
+    if (other.isSetBody()) {
+      this.body = org.apache.thrift.TBaseHelper.copyBinary(other.body);
+;
+    }
+    this.nanos = other.nanos;
+    if (other.isSetHost()) {
+      this.host = other.host;
+    }
+    if (other.isSetFields()) {
+      Map<String,ByteBuffer> __this__fields = new HashMap<String,ByteBuffer>();
+      for (Map.Entry<String, ByteBuffer> other_element : other.fields.entrySet()) {
+
+        String other_element_key = other_element.getKey();
+        ByteBuffer other_element_value = other_element.getValue();
+
+        String __this__fields_copy_key = other_element_key;
+
+        ByteBuffer __this__fields_copy_value = org.apache.thrift.TBaseHelper.copyBinary(other_element_value);
+;
+
+        __this__fields.put(__this__fields_copy_key, __this__fields_copy_value);
+      }
+      this.fields = __this__fields;
+    }
+  }
+
+  public ThriftFlumeEvent deepCopy() {
+    return new ThriftFlumeEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    setTimestampIsSet(false);
+    this.timestamp = 0;
+    this.priority = null;
+    this.body = null;
+    setNanosIsSet(false);
+    this.nanos = 0;
+    this.host = null;
+    this.fields = null;
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public ThriftFlumeEvent setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+    return this;
+  }
+
+  public void unsetTimestamp() {
+    __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+  }
+
+  /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimestamp() {
+    return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+  }
+
+  public void setTimestampIsSet(boolean value) {
+    __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+  }
+
+  /**
+   * 
+   * @see Priority
+   */
+  public Priority getPriority() {
+    return this.priority;
+  }
+
+  /**
+   * 
+   * @see Priority
+   */
+  public ThriftFlumeEvent setPriority(Priority priority) {
+    this.priority = priority;
+    return this;
+  }
+
+  public void unsetPriority() {
+    this.priority = null;
+  }
+
+  /** Returns true if field priority is set (has been assigned a value) and false otherwise */
+  public boolean isSetPriority() {
+    return this.priority != null;
+  }
+
+  public void setPriorityIsSet(boolean value) {
+    if (!value) {
+      this.priority = null;
+    }
+  }
+
+  public byte[] getBody() {
+    setBody(org.apache.thrift.TBaseHelper.rightSize(body));
+    return body == null ? null : body.array();
+  }
+
+  public ByteBuffer bufferForBody() {
+    return body;
+  }
+
+  public ThriftFlumeEvent setBody(byte[] body) {
+    setBody(body == null ? (ByteBuffer)null : ByteBuffer.wrap(body));
+    return this;
+  }
+
+  public ThriftFlumeEvent setBody(ByteBuffer body) {
+    this.body = body;
+    return this;
+  }
+
+  public void unsetBody() {
+    this.body = null;
+  }
+
+  /** Returns true if field body is set (has been assigned a value) and false otherwise */
+  public boolean isSetBody() {
+    return this.body != null;
+  }
+
+  public void setBodyIsSet(boolean value) {
+    if (!value) {
+      this.body = null;
+    }
+  }
+
+  public long getNanos() {
+    return this.nanos;
+  }
+
+  public ThriftFlumeEvent setNanos(long nanos) {
+    this.nanos = nanos;
+    setNanosIsSet(true);
+    return this;
+  }
+
+  public void unsetNanos() {
+    __isset_bit_vector.clear(__NANOS_ISSET_ID);
+  }
+
+  /** Returns true if field nanos is set (has been assigned a value) and false otherwise */
+  public boolean isSetNanos() {
+    return __isset_bit_vector.get(__NANOS_ISSET_ID);
+  }
+
+  public void setNanosIsSet(boolean value) {
+    __isset_bit_vector.set(__NANOS_ISSET_ID, value);
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public ThriftFlumeEvent setHost(String host) {
+    this.host = host;
+    return this;
+  }
+
+  public void unsetHost() {
+    this.host = null;
+  }
+
+  /** Returns true if field host is set (has been assigned a value) and false otherwise */
+  public boolean isSetHost() {
+    return this.host != null;
+  }
+
+  public void setHostIsSet(boolean value) {
+    if (!value) {
+      this.host = null;
+    }
+  }
+
+  public int getFieldsSize() {
+    return (this.fields == null) ? 0 : this.fields.size();
+  }
+
+  public void putToFields(String key, ByteBuffer val) {
+    if (this.fields == null) {
+      this.fields = new HashMap<String,ByteBuffer>();
+    }
+    this.fields.put(key, val);
+  }
+
+  public Map<String,ByteBuffer> getFields() {
+    return this.fields;
+  }
+
+  public ThriftFlumeEvent setFields(Map<String,ByteBuffer> fields) {
+    this.fields = fields;
+    return this;
+  }
+
+  public void unsetFields() {
+    this.fields = null;
+  }
+
+  /** Returns true if field fields is set (has been assigned a value) and false otherwise */
+  public boolean isSetFields() {
+    return this.fields != null;
+  }
+
+  public void setFieldsIsSet(boolean value) {
+    if (!value) {
+      this.fields = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case TIMESTAMP:
+      if (value == null) {
+        unsetTimestamp();
+      } else {
+        setTimestamp((Long)value);
+      }
+      break;
+
+    case PRIORITY:
+      if (value == null) {
+        unsetPriority();
+      } else {
+        setPriority((Priority)value);
+      }
+      break;
+
+    case BODY:
+      if (value == null) {
+        unsetBody();
+      } else {
+        setBody((ByteBuffer)value);
+      }
+      break;
+
+    case NANOS:
+      if (value == null) {
+        unsetNanos();
+      } else {
+        setNanos((Long)value);
+      }
+      break;
+
+    case HOST:
+      if (value == null) {
+        unsetHost();
+      } else {
+        setHost((String)value);
+      }
+      break;
+
+    case FIELDS:
+      if (value == null) {
+        unsetFields();
+      } else {
+        setFields((Map<String,ByteBuffer>)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case TIMESTAMP:
+      return new Long(getTimestamp());
+
+    case PRIORITY:
+      return getPriority();
+
+    case BODY:
+      return getBody();
+
+    case NANOS:
+      return new Long(getNanos());
+
+    case HOST:
+      return getHost();
+
+    case FIELDS:
+      return getFields();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case TIMESTAMP:
+      return isSetTimestamp();
+    case PRIORITY:
+      return isSetPriority();
+    case BODY:
+      return isSetBody();
+    case NANOS:
+      return isSetNanos();
+    case HOST:
+      return isSetHost();
+    case FIELDS:
+      return isSetFields();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ThriftFlumeEvent)
+      return this.equals((ThriftFlumeEvent)that);
+    return false;
+  }
+
+  public boolean equals(ThriftFlumeEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_timestamp = true;
+    boolean that_present_timestamp = true;
+    if (this_present_timestamp || that_present_timestamp) {
+      if (!(this_present_timestamp && that_present_timestamp))
+        return false;
+      if (this.timestamp != that.timestamp)
+        return false;
+    }
+
+    boolean this_present_priority = true && this.isSetPriority();
+    boolean that_present_priority = true && that.isSetPriority();
+    if (this_present_priority || that_present_priority) {
+      if (!(this_present_priority && that_present_priority))
+        return false;
+      if (!this.priority.equals(that.priority))
+        return false;
+    }
+
+    boolean this_present_body = true && this.isSetBody();
+    boolean that_present_body = true && that.isSetBody();
+    if (this_present_body || that_present_body) {
+      if (!(this_present_body && that_present_body))
+        return false;
+      if (!this.body.equals(that.body))
+        return false;
+    }
+
+    boolean this_present_nanos = true;
+    boolean that_present_nanos = true;
+    if (this_present_nanos || that_present_nanos) {
+      if (!(this_present_nanos && that_present_nanos))
+        return false;
+      if (this.nanos != that.nanos)
+        return false;
+    }
+
+    boolean this_present_host = true && this.isSetHost();
+    boolean that_present_host = true && that.isSetHost();
+    if (this_present_host || that_present_host) {
+      if (!(this_present_host && that_present_host))
+        return false;
+      if (!this.host.equals(that.host))
+        return false;
+    }
+
+    boolean this_present_fields = true && this.isSetFields();
+    boolean that_present_fields = true && that.isSetFields();
+    if (this_present_fields || that_present_fields) {
+      if (!(this_present_fields && that_present_fields))
+        return false;
+      if (!this.fields.equals(that.fields))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_timestamp = true;
+    builder.append(present_timestamp);
+    if (present_timestamp)
+      builder.append(timestamp);
+
+    boolean present_priority = true && (isSetPriority());
+    builder.append(present_priority);
+    if (present_priority)
+      builder.append(priority.getValue());
+
+    boolean present_body = true && (isSetBody());
+    builder.append(present_body);
+    if (present_body)
+      builder.append(body);
+
+    boolean present_nanos = true;
+    builder.append(present_nanos);
+    if (present_nanos)
+      builder.append(nanos);
+
+    boolean present_host = true && (isSetHost());
+    builder.append(present_host);
+    if (present_host)
+      builder.append(host);
+
+    boolean present_fields = true && (isSetFields());
+    builder.append(present_fields);
+    if (present_fields)
+      builder.append(fields);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(ThriftFlumeEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    ThriftFlumeEvent typedOther = (ThriftFlumeEvent)other;
+
+    lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimestamp()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetPriority()).compareTo(typedOther.isSetPriority());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPriority()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.priority, typedOther.priority);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetBody()).compareTo(typedOther.isSetBody());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetBody()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.body, typedOther.body);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetNanos()).compareTo(typedOther.isSetNanos());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetNanos()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nanos, typedOther.nanos);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetHost()).compareTo(typedOther.isSetHost());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetHost()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetFields()).compareTo(typedOther.isSetFields());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFields()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fields, typedOther.fields);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // TIMESTAMP
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.timestamp = iprot.readI64();
+            setTimestampIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // PRIORITY
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.priority = Priority.findByValue(iprot.readI32());
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3: // BODY
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.body = iprot.readBinary();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 4: // NANOS
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.nanos = iprot.readI64();
+            setNanosIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 5: // HOST
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.host = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 6: // FIELDS
+          if (field.type == org.apache.thrift.protocol.TType.MAP) {
+            {
+              org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
+              this.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
+              for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+              {
+                String _key2;
+                ByteBuffer _val3;
+                _key2 = iprot.readString();
+                _val3 = iprot.readBinary();
+                this.fields.put(_key2, _val3);
+              }
+              iprot.readMapEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate method
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+    oprot.writeI64(this.timestamp);
+    oprot.writeFieldEnd();
+    if (this.priority != null) {
+      oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
+      oprot.writeI32(this.priority.getValue());
+      oprot.writeFieldEnd();
+    }
+    if (this.body != null) {
+      oprot.writeFieldBegin(BODY_FIELD_DESC);
+      oprot.writeBinary(this.body);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldBegin(NANOS_FIELD_DESC);
+    oprot.writeI64(this.nanos);
+    oprot.writeFieldEnd();
+    if (this.host != null) {
+      oprot.writeFieldBegin(HOST_FIELD_DESC);
+      oprot.writeString(this.host);
+      oprot.writeFieldEnd();
+    }
+    if (this.fields != null) {
+      oprot.writeFieldBegin(FIELDS_FIELD_DESC);
+      {
+        oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.fields.size()));
+        for (Map.Entry<String, ByteBuffer> _iter4 : this.fields.entrySet())
+        {
+          oprot.writeString(_iter4.getKey());
+          oprot.writeBinary(_iter4.getValue());
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ThriftFlumeEvent(");
+    boolean first = true;
+
+    sb.append("timestamp:");
+    sb.append(this.timestamp);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("priority:");
+    if (this.priority == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.priority);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("body:");
+    if (this.body == null) {
+      sb.append("null");
+    } else {
+      org.apache.thrift.TBaseHelper.toString(this.body, sb);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("nanos:");
+    sb.append(this.nanos);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("host:");
+    if (this.host == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.host);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("fields:");
+    if (this.fields == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.fields);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+