You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/07 01:00:20 UTC

svn commit: r1179893 - in /incubator/flume/branches/flume-728/flume-ng-core: pom.xml src/main/avro/ src/main/avro/flume.avdl src/main/java/org/apache/flume/source/AvroSource.java src/test/java/org/apache/flume/source/TestAvroSource.java

Author: esammer
Date: Thu Oct  6 23:00:20 2011
New Revision: 1179893

URL: http://svn.apache.org/viewvc?rev=1179893&view=rev
Log:
FLUME-771: Implement NG Avro source

- Basic implementation of an avro source. Not entirely feature complete,
  but it works.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/
    incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/pom.xml

Modified: incubator/flume/branches/flume-728/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/pom.xml?rev=1179893&r1=1179892&r2=1179893&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/pom.xml Thu Oct  6 23:00:20 2011
@@ -38,6 +38,38 @@
         </executions>
       </plugin>
 
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.5.1</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.thoughtworks.paranamer</groupId>
+        <artifactId>paranamer-maven-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <execution>
+            <id>run</id>
+            <configuration>
+              <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
+              <outputDirectory>${project.build.directory}/classes</outputDirectory>
+            </configuration>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
   </build>
 
@@ -87,6 +119,16 @@
       <artifactId>hadoop-core</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl Thu Oct  6 23:00:20 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+@namespace("org.apache.flume.source.avro")
+
+protocol AvroSourceProtocol {
+
+  enum Status {
+    OK, FAILED, UNKNOWN
+  }
+
+  record AvroFlumeEvent {
+    map<string> headers;
+    bytes body;
+  }
+
+  Status append( AvroFlumeEvent event );
+
+}
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Thu Oct  6 23:00:20 2011
@@ -0,0 +1,113 @@
+package org.apache.flume.source;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroSource extends AbstractSource implements EventDrivenSource,
+    Configurable, AvroSourceProtocol {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(AvroSource.class);
+
+  private int port;
+  private String bindAddress;
+
+  private Server server;
+
+  @Override
+  public void configure(Context context) {
+    port = context.get("port", Integer.class);
+    bindAddress = context.get("bind", String.class);
+  }
+
+  @Override
+  public void start() {
+    logger.info("Avro source starting:{}", this);
+
+    Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
+    server = new NettyServer(responder,
+        new InetSocketAddress(bindAddress, port));
+
+    server.start();
+
+    super.start();
+
+    logger.debug("Avro source started");
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Avro source stopping:{}", this);
+
+    server.close();
+
+    try {
+      server.join();
+    } catch (InterruptedException e) {
+      logger
+          .info("Interrupted while waiting for Avro server to stop. Exiting.");
+    }
+
+    super.stop();
+
+    logger.debug("Avro source stopped");
+  }
+
+  @Override
+  public String toString() {
+    return "AvroSource: { bindAddress:" + bindAddress + " port:" + port + " }";
+  }
+
+  @Override
+  public Status append(AvroFlumeEvent avroEvent) throws AvroRemoteException {
+    logger.debug("Received avro event:{}", avroEvent);
+
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+
+    try {
+      transaction.begin();
+
+      Map<String, String> headers = new HashMap<String, String>();
+
+      for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+          .entrySet()) {
+
+        headers.put(entry.getKey().toString(), entry.getValue().toString());
+      }
+
+      Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+      channel.put(event);
+
+      transaction.commit();
+    } catch (ChannelException e) {
+      transaction.rollback();
+      return Status.FAILED;
+    } finally {
+      transaction.close();
+    }
+
+    return Status.OK;
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1179893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Thu Oct  6 23:00:20 2011
@@ -0,0 +1,138 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAvroSource {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestAvroSource.class);
+
+  private int selectedPort;
+  private AvroSource source;
+  private Channel channel;
+
+  @Before
+  public void setUp() {
+    source = new AvroSource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    source.setChannel(channel);
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException {
+    boolean bound = false;
+
+    for (int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", selectedPort = 41414 + i);
+        context.put("bind", "0.0.0.0");
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        /*
+         * NB: This assume we're using the Netty server under the hood and the
+         * failure is to bind. Yucky.
+         */
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+  @Test
+  public void testRequest() throws InterruptedException, IOException {
+    boolean bound = false;
+
+    for (int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", selectedPort = 41414 + i);
+        context.put("bind", "0.0.0.0");
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        /*
+         * NB: This assume we're using the Netty server under the hood and the
+         * failure is to bind. Yucky.
+         */
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    AvroSourceProtocol client = SpecificRequestor.getClient(
+        AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
+            selectedPort)));
+
+    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+
+    avroEvent.headers = new HashMap<CharSequence, CharSequence>();
+    avroEvent.body = ByteBuffer.wrap("Hello avro".getBytes());
+
+    Status status = client.append(avroEvent);
+
+    Assert.assertEquals(Status.OK, status);
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("Channel contained our event", "Hello avro",
+        new String(event.getBody()));
+
+    logger.debug("Round trip event:{}", event);
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+}