You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/02/06 20:48:01 UTC
svn commit: r1241129 [1/2] - in
/incubator/flume/branches/flume-728/flume-ng-legacy-sources: ./
flume-avro-source/ flume-avro-source/src/ flume-avro-source/src/main/
flume-avro-source/src/main/avro/ flume-avro-source/src/main/java/
flume-avro-source/sr...
Author: prasadm
Date: Mon Feb 6 19:47:59 2012
New Revision: 1241129
URL: http://svn.apache.org/viewvc?rev=1241129&view=rev
Log:
FLUME-942: Support event compatibility with Flume 0.9x
Added:
incubator/flume/branches/flume-728/flume-ng-legacy-sources/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/flumeCompatibility.thrift
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/pom.xml
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/pom.xml Mon Feb 6 19:47:59 2012
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume-ng-legacy-sources</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.1.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
+ <artifactId>flume-avro-source</artifactId>
+ <name>Flume legacy Avro source</name>
+
+ <properties>
+ <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/avro/flumeCompatibility.avdl Mon Feb 6 19:47:59 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Avro protocol for Flume 0.9.x compatibility */
+@namespace("com.cloudera.flume.handlers.avro")
+protocol FlumeOGEventAvroServer{
+ enum Priority { FATAL, ERROR, WARN,
+ INFO, DEBUG, TRACE}
+
+ record AvroFlumeOGEvent {
+ long timestamp;
+ Priority priority;
+ bytes body;
+ long nanos;
+ string host;
+ map<bytes> fields;
+ }
+
+ void append( AvroFlumeOGEvent evt ) ;
+}
\ No newline at end of file
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Mon Feb 6 19:47:59 2012
@@ -0,0 +1,137 @@
+package org.apache.flume.source.avroLegacy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Source;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
+import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flume.ChannelException;
+
+/**
+ * <p>
+ * A {@link Source} implementation that receives Avro events from Avro sink of Flume OG
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>host</tt></td>
+ * <td>The hostname or IP to which the source will bind.</td>
+ * <td>Hostname or IP / String</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which the source will bind and listen for events.</td>
+ * <td>TCP port / int</td>
+ * <td>none (required)</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
+
+public class AvroLegacySource extends AbstractSource implements EventDrivenSource,
+ Configurable, FlumeOGEventAvroServer {
+
+ static final Logger LOG = LoggerFactory.getLogger(AvroLegacySource.class);
+
+ // Flume OG event fields
+ public static final String HOST = "host";
+ public static final String TIMESTAMP = "timestamp";
+ public static final String PRIORITY = "pri";
+ public static final String NANOS = "nanos";
+ public static final String OG_EVENT = "FlumeOG";
+
+ private CounterGroup counterGroup;
+ protected FlumeOGEventAvroServer avroClient;
+ private String host;
+ private int port;
+ private SpecificResponder res;
+ private HttpServer http;
+
+ public AvroLegacySource() {
+ counterGroup = new CounterGroup();
+ }
+
+ @Override
+ public void start() {
+ // setup http server to receive OG events
+ res = new SpecificResponder(FlumeOGEventAvroServer.class, this);
+ try {
+ http = new HttpServer(res, host, port);
+ } catch (IOException eI) {
+ LOG.warn("Failed to start server", eI);
+ return;
+ }
+ http.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ http.close();
+ super.stop();
+ }
+
+ @Override
+ public Void append( AvroFlumeOGEvent evt ) throws org.apache.avro.AvroRemoteException {
+ counterGroup.incrementAndGet("rpc.received");
+ Map<String, String> headers = new HashMap<String, String>();
+
+ // extract Flume OG event headers
+ headers.put(HOST, evt.getHost().toString());
+ headers.put(TIMESTAMP, evt.getTimestamp().toString());
+ headers.put(PRIORITY, evt.getPriority().toString());
+ headers.put(NANOS, evt.getNanos().toString());
+ for (Entry<CharSequence, ByteBuffer> entry: evt.getFields().entrySet()) {
+ headers.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ headers.put(OG_EVENT, "yes");
+
+ Event event = EventBuilder.withBody(evt.getBody().array(), headers);
+ try {
+ getChannelProcessor().processEvent(event);
+ counterGroup.incrementAndGet("rpc.events");
+ } catch (ChannelException ex) {
+ return null;
+ }
+
+ counterGroup.incrementAndGet("rpc.successful");
+ return null;
+ }
+
+ @Override
+ public void configure(Context context) {
+ port = Integer.parseInt(context.get("port", String.class));
+ host = context.get("host", String.class);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java Mon Feb 6 19:47:59 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.avroLegacy;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
+import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import com.cloudera.flume.handlers.avro.Priority;
+import org.jboss.netty.channel.ChannelException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestLegacyAvroSource {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestLegacyAvroSource.class);
+
+ private int selectedPort;
+ private AvroLegacySource source;
+ private Channel channel;
+
+ @Before
+ public void setUp() {
+ source = new AvroLegacySource();
+ channel = new MemoryChannel();
+
+ Configurables.configure(channel, new Context());
+
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(channels);
+
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException {
+ boolean bound = false;
+
+ for (int i = 0; i < 100 && !bound; i++) {
+ try {
+ Context context = new Context();
+
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("host", "0.0.0.0");
+
+ Configurables.configure(source, context);
+
+ source.start();
+ bound = true;
+ } catch (ChannelException e) {
+ // Assume port in use, try another one
+ }
+ }
+
+ Assert
+ .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ source, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START,
+ source.getLifecycleState());
+
+ source.stop();
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+ @Test
+ public void testRequest() throws InterruptedException, IOException {
+ boolean bound = false;
+ int i;
+
+ for (i = 0; i < 100 && !bound; i++) {
+ try {
+ Context context = new Context();
+
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("host", "0.0.0.0");
+
+ Configurables.configure(source, context);
+
+ source.start();
+ bound = true;
+ } catch (ChannelException e) {
+ // Assume port in use, try another one
+ }
+ }
+
+ Assert
+ .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ source, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START,
+ source.getLifecycleState());
+
+ // setup a requester, to send a flume OG event
+ URL url = new URL("http", "0.0.0.0", selectedPort, "/");
+ Transceiver http = new HttpTransceiver(url);
+ FlumeOGEventAvroServer client = SpecificRequestor.getClient(
+ FlumeOGEventAvroServer.class, http);
+
+ AvroFlumeOGEvent avroEvent = AvroFlumeOGEvent.newBuilder().setHost("foo").
+ setPriority(Priority.INFO).setNanos(0).setTimestamp(1).
+ setFields(new HashMap<CharSequence, ByteBuffer> ()).
+ setBody(ByteBuffer.wrap("foo".getBytes())).build();
+
+ client.append(avroEvent);
+
+ // check if the even has arrived in the channel through OG avro source
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertEquals("Channel contained our event", "foo",
+ new String(event.getBody()));
+ transaction.commit();
+ transaction.close();
+
+ source.stop();
+
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/pom.xml Mon Feb 6 19:47:59 2012
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume-ng-legacy-sources</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.1.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
+ <artifactId>flume-thrift-source</artifactId>
+ <name>Flume legacy Thrift Source</name>
+
+ <profiles>
+ <profile>
+ <id>compileThrift</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.thrift.tools</groupId>
+ <artifactId>maven-thrift-plugin</artifactId>
+ <version>0.1.10</version>
+ <configuration>
+ <thriftExecutable>${thrift.executable}</thriftExecutable>
+ </configuration>
+ <executions>
+ <execution>
+ <id>thrift-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <excludes>
+ <exclude>**/com/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>nonThrift</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <excludes>
+ <exclude>**/generated-sources/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ </profile>
+ </profiles>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/README Mon Feb 6 19:47:59 2012
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+The com directory contains generated java source from thrift. This
+is to support backward compatibility with 0.9.x flume. The 1.0 version
+of Flume doesn't use thrift so in order to avoid depenancy on thrift
+compiler, the generated sources are checked into the source tree.
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java Mon Feb 6 19:47:59 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum EventStatus implements org.apache.thrift.TEnum {
+ ACK(0),
+ COMMITED(1),
+ ERR(2);
+
+ private final int value;
+
+ private EventStatus(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static EventStatus findByValue(int value) {
+ switch (value) {
+ case 0:
+ return ACK;
+ case 1:
+ return COMMITED;
+ case 2:
+ return ERR;
+ default:
+ return null;
+ }
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java Mon Feb 6 19:47:59 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum Priority implements org.apache.thrift.TEnum {
+ FATAL(0),
+ ERROR(1),
+ WARN(2),
+ INFO(3),
+ DEBUG(4),
+ TRACE(5);
+
+ private final int value;
+
+ private Priority(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static Priority findByValue(int value) {
+ switch (value) {
+ case 0:
+ return FATAL;
+ case 1:
+ return ERROR;
+ case 2:
+ return WARN;
+ case 3:
+ return INFO;
+ case 4:
+ return DEBUG;
+ case 5:
+ return TRACE;
+ default:
+ return null;
+ }
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java?rev=1241129&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java Mon Feb 6 19:47:59 2012
@@ -0,0 +1,884 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package com.cloudera.flume.handlers.thrift;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftFlumeEvent implements org.apache.thrift.TBase<ThriftFlumeEvent, ThriftFlumeEvent._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftFlumeEvent");
+
+ private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)1);
+ private static final org.apache.thrift.protocol.TField PRIORITY_FIELD_DESC = new org.apache.thrift.protocol.TField("priority", org.apache.thrift.protocol.TType.I32, (short)2);
+ private static final org.apache.thrift.protocol.TField BODY_FIELD_DESC = new org.apache.thrift.protocol.TField("body", org.apache.thrift.protocol.TType.STRING, (short)3);
+ private static final org.apache.thrift.protocol.TField NANOS_FIELD_DESC = new org.apache.thrift.protocol.TField("nanos", org.apache.thrift.protocol.TType.I64, (short)4);
+ private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField FIELDS_FIELD_DESC = new org.apache.thrift.protocol.TField("fields", org.apache.thrift.protocol.TType.MAP, (short)6);
+
+ public long timestamp;
+ /**
+ *
+ * @see Priority
+ */
+ public Priority priority;
+ public ByteBuffer body;
+ public long nanos;
+ public String host;
+ public Map<String,ByteBuffer> fields;
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TIMESTAMP((short)1, "timestamp"),
+ /**
+ *
+ * @see Priority
+ */
+ PRIORITY((short)2, "priority"),
+ BODY((short)3, "body"),
+ NANOS((short)4, "nanos"),
+ HOST((short)5, "host"),
+ FIELDS((short)6, "fields");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TIMESTAMP
+ return TIMESTAMP;
+ case 2: // PRIORITY
+ return PRIORITY;
+ case 3: // BODY
+ return BODY;
+ case 4: // NANOS
+ return NANOS;
+ case 5: // HOST
+ return HOST;
+ case 6: // FIELDS
+ return FIELDS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __TIMESTAMP_ISSET_ID = 0;
+ private static final int __NANOS_ISSET_ID = 1;
+ private BitSet __isset_bit_vector = new BitSet(2);
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64 , "Timestamp")));
+ tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, Priority.class)));
+ tmpMap.put(_Fields.BODY, new org.apache.thrift.meta_data.FieldMetaData("body", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)));
+ tmpMap.put(_Fields.NANOS, new org.apache.thrift.meta_data.FieldMetaData("nanos", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.FIELDS, new org.apache.thrift.meta_data.FieldMetaData("fields", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ThriftFlumeEvent.class, metaDataMap);
+ }
+
+ public ThriftFlumeEvent() {
+ }
+
+ public ThriftFlumeEvent(
+ long timestamp,
+ Priority priority,
+ ByteBuffer body,
+ long nanos,
+ String host,
+ Map<String,ByteBuffer> fields)
+ {
+ this();
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ this.priority = priority;
+ this.body = body;
+ this.nanos = nanos;
+ setNanosIsSet(true);
+ this.host = host;
+ this.fields = fields;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ThriftFlumeEvent(ThriftFlumeEvent other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ this.timestamp = other.timestamp;
+ if (other.isSetPriority()) {
+ this.priority = other.priority;
+ }
+ if (other.isSetBody()) {
+ this.body = org.apache.thrift.TBaseHelper.copyBinary(other.body);
+;
+ }
+ this.nanos = other.nanos;
+ if (other.isSetHost()) {
+ this.host = other.host;
+ }
+ if (other.isSetFields()) {
+ Map<String,ByteBuffer> __this__fields = new HashMap<String,ByteBuffer>();
+ for (Map.Entry<String, ByteBuffer> other_element : other.fields.entrySet()) {
+
+ String other_element_key = other_element.getKey();
+ ByteBuffer other_element_value = other_element.getValue();
+
+ String __this__fields_copy_key = other_element_key;
+
+ ByteBuffer __this__fields_copy_value = org.apache.thrift.TBaseHelper.copyBinary(other_element_value);
+;
+
+ __this__fields.put(__this__fields_copy_key, __this__fields_copy_value);
+ }
+ this.fields = __this__fields;
+ }
+ }
+
+ public ThriftFlumeEvent deepCopy() {
+ return new ThriftFlumeEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ setTimestampIsSet(false);
+ this.timestamp = 0;
+ this.priority = null;
+ this.body = null;
+ setNanosIsSet(false);
+ this.nanos = 0;
+ this.host = null;
+ this.fields = null;
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public ThriftFlumeEvent setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ return this;
+ }
+
+ public void unsetTimestamp() {
+ __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+ }
+
+ /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+ public boolean isSetTimestamp() {
+ return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+ }
+
+ public void setTimestampIsSet(boolean value) {
+ __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+ }
+
+ /**
+ *
+ * @see Priority
+ */
+ public Priority getPriority() {
+ return this.priority;
+ }
+
+ /**
+ *
+ * @see Priority
+ */
+ public ThriftFlumeEvent setPriority(Priority priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public void unsetPriority() {
+ this.priority = null;
+ }
+
+ /** Returns true if field priority is set (has been assigned a value) and false otherwise */
+ public boolean isSetPriority() {
+ return this.priority != null;
+ }
+
+ public void setPriorityIsSet(boolean value) {
+ if (!value) {
+ this.priority = null;
+ }
+ }
+
+ public byte[] getBody() {
+ setBody(org.apache.thrift.TBaseHelper.rightSize(body));
+ return body == null ? null : body.array();
+ }
+
+ public ByteBuffer bufferForBody() {
+ return body;
+ }
+
+ public ThriftFlumeEvent setBody(byte[] body) {
+ setBody(body == null ? (ByteBuffer)null : ByteBuffer.wrap(body));
+ return this;
+ }
+
+ public ThriftFlumeEvent setBody(ByteBuffer body) {
+ this.body = body;
+ return this;
+ }
+
+ public void unsetBody() {
+ this.body = null;
+ }
+
+ /** Returns true if field body is set (has been assigned a value) and false otherwise */
+ public boolean isSetBody() {
+ return this.body != null;
+ }
+
+ public void setBodyIsSet(boolean value) {
+ if (!value) {
+ this.body = null;
+ }
+ }
+
+ public long getNanos() {
+ return this.nanos;
+ }
+
+ public ThriftFlumeEvent setNanos(long nanos) {
+ this.nanos = nanos;
+ setNanosIsSet(true);
+ return this;
+ }
+
+ public void unsetNanos() {
+ __isset_bit_vector.clear(__NANOS_ISSET_ID);
+ }
+
+ /** Returns true if field nanos is set (has been assigned a value) and false otherwise */
+ public boolean isSetNanos() {
+ return __isset_bit_vector.get(__NANOS_ISSET_ID);
+ }
+
+ public void setNanosIsSet(boolean value) {
+ __isset_bit_vector.set(__NANOS_ISSET_ID, value);
+ }
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public ThriftFlumeEvent setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public void unsetHost() {
+ this.host = null;
+ }
+
+ /** Returns true if field host is set (has been assigned a value) and false otherwise */
+ public boolean isSetHost() {
+ return this.host != null;
+ }
+
+ public void setHostIsSet(boolean value) {
+ if (!value) {
+ this.host = null;
+ }
+ }
+
+ public int getFieldsSize() {
+ return (this.fields == null) ? 0 : this.fields.size();
+ }
+
+ public void putToFields(String key, ByteBuffer val) {
+ if (this.fields == null) {
+ this.fields = new HashMap<String,ByteBuffer>();
+ }
+ this.fields.put(key, val);
+ }
+
+ public Map<String,ByteBuffer> getFields() {
+ return this.fields;
+ }
+
+ public ThriftFlumeEvent setFields(Map<String,ByteBuffer> fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ public void unsetFields() {
+ this.fields = null;
+ }
+
+ /** Returns true if field fields is set (has been assigned a value) and false otherwise */
+ public boolean isSetFields() {
+ return this.fields != null;
+ }
+
+ public void setFieldsIsSet(boolean value) {
+ if (!value) {
+ this.fields = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TIMESTAMP:
+ if (value == null) {
+ unsetTimestamp();
+ } else {
+ setTimestamp((Long)value);
+ }
+ break;
+
+ case PRIORITY:
+ if (value == null) {
+ unsetPriority();
+ } else {
+ setPriority((Priority)value);
+ }
+ break;
+
+ case BODY:
+ if (value == null) {
+ unsetBody();
+ } else {
+ setBody((ByteBuffer)value);
+ }
+ break;
+
+ case NANOS:
+ if (value == null) {
+ unsetNanos();
+ } else {
+ setNanos((Long)value);
+ }
+ break;
+
+ case HOST:
+ if (value == null) {
+ unsetHost();
+ } else {
+ setHost((String)value);
+ }
+ break;
+
+ case FIELDS:
+ if (value == null) {
+ unsetFields();
+ } else {
+ setFields((Map<String,ByteBuffer>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TIMESTAMP:
+ return new Long(getTimestamp());
+
+ case PRIORITY:
+ return getPriority();
+
+ case BODY:
+ return getBody();
+
+ case NANOS:
+ return new Long(getNanos());
+
+ case HOST:
+ return getHost();
+
+ case FIELDS:
+ return getFields();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TIMESTAMP:
+ return isSetTimestamp();
+ case PRIORITY:
+ return isSetPriority();
+ case BODY:
+ return isSetBody();
+ case NANOS:
+ return isSetNanos();
+ case HOST:
+ return isSetHost();
+ case FIELDS:
+ return isSetFields();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ThriftFlumeEvent)
+ return this.equals((ThriftFlumeEvent)that);
+ return false;
+ }
+
+ public boolean equals(ThriftFlumeEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_timestamp = true;
+ boolean that_present_timestamp = true;
+ if (this_present_timestamp || that_present_timestamp) {
+ if (!(this_present_timestamp && that_present_timestamp))
+ return false;
+ if (this.timestamp != that.timestamp)
+ return false;
+ }
+
+ boolean this_present_priority = true && this.isSetPriority();
+ boolean that_present_priority = true && that.isSetPriority();
+ if (this_present_priority || that_present_priority) {
+ if (!(this_present_priority && that_present_priority))
+ return false;
+ if (!this.priority.equals(that.priority))
+ return false;
+ }
+
+ boolean this_present_body = true && this.isSetBody();
+ boolean that_present_body = true && that.isSetBody();
+ if (this_present_body || that_present_body) {
+ if (!(this_present_body && that_present_body))
+ return false;
+ if (!this.body.equals(that.body))
+ return false;
+ }
+
+ boolean this_present_nanos = true;
+ boolean that_present_nanos = true;
+ if (this_present_nanos || that_present_nanos) {
+ if (!(this_present_nanos && that_present_nanos))
+ return false;
+ if (this.nanos != that.nanos)
+ return false;
+ }
+
+ boolean this_present_host = true && this.isSetHost();
+ boolean that_present_host = true && that.isSetHost();
+ if (this_present_host || that_present_host) {
+ if (!(this_present_host && that_present_host))
+ return false;
+ if (!this.host.equals(that.host))
+ return false;
+ }
+
+ boolean this_present_fields = true && this.isSetFields();
+ boolean that_present_fields = true && that.isSetFields();
+ if (this_present_fields || that_present_fields) {
+ if (!(this_present_fields && that_present_fields))
+ return false;
+ if (!this.fields.equals(that.fields))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_timestamp = true;
+ builder.append(present_timestamp);
+ if (present_timestamp)
+ builder.append(timestamp);
+
+ boolean present_priority = true && (isSetPriority());
+ builder.append(present_priority);
+ if (present_priority)
+ builder.append(priority.getValue());
+
+ boolean present_body = true && (isSetBody());
+ builder.append(present_body);
+ if (present_body)
+ builder.append(body);
+
+ boolean present_nanos = true;
+ builder.append(present_nanos);
+ if (present_nanos)
+ builder.append(nanos);
+
+ boolean present_host = true && (isSetHost());
+ builder.append(present_host);
+ if (present_host)
+ builder.append(host);
+
+ boolean present_fields = true && (isSetFields());
+ builder.append(present_fields);
+ if (present_fields)
+ builder.append(fields);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(ThriftFlumeEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ ThriftFlumeEvent typedOther = (ThriftFlumeEvent)other;
+
+ lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTimestamp()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetPriority()).compareTo(typedOther.isSetPriority());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetPriority()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.priority, typedOther.priority);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetBody()).compareTo(typedOther.isSetBody());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetBody()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.body, typedOther.body);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetNanos()).compareTo(typedOther.isSetNanos());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetNanos()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nanos, typedOther.nanos);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetHost()).compareTo(typedOther.isSetHost());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetHost()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetFields()).compareTo(typedOther.isSetFields());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFields()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fields, typedOther.fields);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // TIMESTAMP
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.timestamp = iprot.readI64();
+ setTimestampIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // PRIORITY
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.priority = Priority.findByValue(iprot.readI32());
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3: // BODY
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.body = iprot.readBinary();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 4: // NANOS
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.nanos = iprot.readI64();
+ setNanosIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 5: // HOST
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.host = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 6: // FIELDS
+ if (field.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
+ this.fields = new HashMap<String,ByteBuffer>(2*_map0.size);
+ for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+ {
+ String _key2;
+ ByteBuffer _val3;
+ _key2 = iprot.readString();
+ _val3 = iprot.readBinary();
+ this.fields.put(_key2, _val3);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+ oprot.writeI64(this.timestamp);
+ oprot.writeFieldEnd();
+ if (this.priority != null) {
+ oprot.writeFieldBegin(PRIORITY_FIELD_DESC);
+ oprot.writeI32(this.priority.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (this.body != null) {
+ oprot.writeFieldBegin(BODY_FIELD_DESC);
+ oprot.writeBinary(this.body);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(NANOS_FIELD_DESC);
+ oprot.writeI64(this.nanos);
+ oprot.writeFieldEnd();
+ if (this.host != null) {
+ oprot.writeFieldBegin(HOST_FIELD_DESC);
+ oprot.writeString(this.host);
+ oprot.writeFieldEnd();
+ }
+ if (this.fields != null) {
+ oprot.writeFieldBegin(FIELDS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.fields.size()));
+ for (Map.Entry<String, ByteBuffer> _iter4 : this.fields.entrySet())
+ {
+ oprot.writeString(_iter4.getKey());
+ oprot.writeBinary(_iter4.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ThriftFlumeEvent(");
+ boolean first = true;
+
+ sb.append("timestamp:");
+ sb.append(this.timestamp);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("priority:");
+ if (this.priority == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.priority);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("body:");
+ if (this.body == null) {
+ sb.append("null");
+ } else {
+ org.apache.thrift.TBaseHelper.toString(this.body, sb);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("nanos:");
+ sb.append(this.nanos);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("host:");
+ if (this.host == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.host);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("fields:");
+ if (this.fields == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.fields);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+