You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/07 01:00:20 UTC
svn commit: r1179893 - in /incubator/flume/branches/flume-728/flume-ng-core:
pom.xml src/main/avro/ src/main/avro/flume.avdl
src/main/java/org/apache/flume/source/AvroSource.java
src/test/java/org/apache/flume/source/TestAvroSource.java
Author: esammer
Date: Thu Oct 6 23:00:20 2011
New Revision: 1179893
URL: http://svn.apache.org/viewvc?rev=1179893&view=rev
Log:
FLUME-771: Implement NG Avro source
- Basic implementation of an avro source. Not entirely feature complete,
but it works.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/
incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/pom.xml
Modified: incubator/flume/branches/flume-728/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/pom.xml?rev=1179893&r1=1179892&r2=1179893&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/pom.xml Thu Oct 6 23:00:20 2011
@@ -38,6 +38,38 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.5.1</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer-maven-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <id>run</id>
+ <configuration>
+ <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </configuration>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
@@ -87,6 +119,16 @@
<artifactId>hadoop-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+
</dependencies>
</project>
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl Thu Oct 6 23:00:20 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@namespace("org.apache.flume.source.avro")
+
+protocol AvroSourceProtocol {
+
+ enum Status {
+ OK, FAILED, UNKNOWN
+ }
+
+ record AvroFlumeEvent {
+ map<string> headers;
+ bytes body;
+ }
+
+ Status append( AvroFlumeEvent event );
+
+}
\ No newline at end of file
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Thu Oct 6 23:00:20 2011
@@ -0,0 +1,113 @@
+package org.apache.flume.source;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroSource extends AbstractSource implements EventDrivenSource,
+ Configurable, AvroSourceProtocol {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(AvroSource.class);
+
+ private int port;
+ private String bindAddress;
+
+ private Server server;
+
+ @Override
+ public void configure(Context context) {
+ port = context.get("port", Integer.class);
+ bindAddress = context.get("bind", String.class);
+ }
+
+ @Override
+ public void start() {
+ logger.info("Avro source starting:{}", this);
+
+ Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
+ server = new NettyServer(responder,
+ new InetSocketAddress(bindAddress, port));
+
+ server.start();
+
+ super.start();
+
+ logger.debug("Avro source started");
+ }
+
+ @Override
+ public void stop() {
+ logger.info("Avro source stopping:{}", this);
+
+ server.close();
+
+ try {
+ server.join();
+ } catch (InterruptedException e) {
+ logger
+ .info("Interrupted while waiting for Avro server to stop. Exiting.");
+ }
+
+ super.stop();
+
+ logger.debug("Avro source stopped");
+ }
+
+ @Override
+ public String toString() {
+ return "AvroSource: { bindAddress:" + bindAddress + " port:" + port + " }";
+ }
+
+ @Override
+ public Status append(AvroFlumeEvent avroEvent) throws AvroRemoteException {
+ logger.debug("Received avro event:{}", avroEvent);
+
+ Channel channel = getChannel();
+ Transaction transaction = channel.getTransaction();
+
+ try {
+ transaction.begin();
+
+ Map<String, String> headers = new HashMap<String, String>();
+
+ for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+ .entrySet()) {
+
+ headers.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+ channel.put(event);
+
+ transaction.commit();
+ } catch (ChannelException e) {
+ transaction.rollback();
+ return Status.FAILED;
+ } finally {
+ transaction.close();
+ }
+
+ return Status.OK;
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Thu Oct 6 23:00:20 2011
@@ -0,0 +1,138 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAvroSource {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestAvroSource.class);
+
+ private int selectedPort;
+ private AvroSource source;
+ private Channel channel;
+
+ @Before
+ public void setUp() {
+ source = new AvroSource();
+ channel = new MemoryChannel();
+
+ Configurables.configure(channel, new Context());
+
+ source.setChannel(channel);
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException {
+ boolean bound = false;
+
+ for (int i = 0; i < 100 && !bound; i++) {
+ try {
+ Context context = new Context();
+
+ context.put("port", selectedPort = 41414 + i);
+ context.put("bind", "0.0.0.0");
+
+ Configurables.configure(source, context);
+
+ source.start();
+ bound = true;
+ } catch (ChannelException e) {
+ /*
+ * NB: This assume we're using the Netty server under the hood and the
+ * failure is to bind. Yucky.
+ */
+ }
+ }
+
+ Assert
+ .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ source, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START,
+ source.getLifecycleState());
+
+ source.stop();
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+ @Test
+ public void testRequest() throws InterruptedException, IOException {
+ boolean bound = false;
+
+ for (int i = 0; i < 100 && !bound; i++) {
+ try {
+ Context context = new Context();
+
+ context.put("port", selectedPort = 41414 + i);
+ context.put("bind", "0.0.0.0");
+
+ Configurables.configure(source, context);
+
+ source.start();
+ bound = true;
+ } catch (ChannelException e) {
+ /*
+ * NB: This assume we're using the Netty server under the hood and the
+ * failure is to bind. Yucky.
+ */
+ }
+ }
+
+ Assert
+ .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+ source, LifecycleState.START_OR_ERROR));
+ Assert.assertEquals("Server is started", LifecycleState.START,
+ source.getLifecycleState());
+
+ AvroSourceProtocol client = SpecificRequestor.getClient(
+ AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
+ selectedPort)));
+
+ AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+
+ avroEvent.headers = new HashMap<CharSequence, CharSequence>();
+ avroEvent.body = ByteBuffer.wrap("Hello avro".getBytes());
+
+ Status status = client.append(avroEvent);
+
+ Assert.assertEquals(Status.OK, status);
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertEquals("Channel contained our event", "Hello avro",
+ new String(event.getBody()));
+
+ logger.debug("Round trip event:{}", event);
+
+ source.stop();
+ Assert.assertTrue("Reached stop or error",
+ LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+}