You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/11 09:03:54 UTC

svn commit: r1181637 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-node/src/main/java/org/apache/flume/node/

Author: esammer
Date: Tue Oct 11 07:03:53 2011
New Revision: 1181637

URL: http://svn.apache.org/viewvc?rev=1181637&view=rev
Log:
FLUME-778: Implement NG Avro sink

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1181637&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Tue Oct 11 07:03:53 2011
@@ -0,0 +1,170 @@
+package org.apache.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class AvroSink extends AbstractSink implements PollableSink,
+    Configurable {
+
+  private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
+  private static final Integer defaultBatchSize = 100;
+
+  private String hostname;
+  private Integer port;
+  private Integer batchSize;
+
+  private AvroSourceProtocol client;
+  private Transceiver transceiver;
+  private CounterGroup counterGroup;
+
+  public AvroSink() {
+    counterGroup = new CounterGroup();
+  }
+
+  @Override
+  public void configure(Context context) {
+    hostname = context.get("hostname", String.class);
+    port = context.get("port", Integer.class);
+    batchSize = context.get("batch-size", Integer.class);
+
+    if (batchSize == null) {
+      batchSize = defaultBatchSize;
+    }
+
+    Preconditions.checkState(hostname != null, "No hostname specified");
+    Preconditions.checkState(port != null, "No port specified");
+  }
+
+  @Override
+  public void start() {
+    logger.info("Avro sink starting");
+
+    try {
+      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
+      client = SpecificRequestor.getClient(AvroSourceProtocol.class,
+          transceiver);
+    } catch (Exception e) {
+      logger.error("Unable to create avro client using hostname:" + hostname
+          + " port:" + port + ". Exception follows.", e);
+
+      /* Try to prevent leaking resources. */
+      if (transceiver != null) {
+        try {
+          transceiver.close();
+        } catch (IOException e1) {
+          logger
+              .error(
+                  "Attempt to clean up avro tranceiver after client error failed. Exception follows.",
+                  e1);
+        }
+      }
+
+      /* FIXME: Mark ourselves as failed. */
+      return;
+    }
+
+    super.start();
+
+    logger.debug("Avro sink started");
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Avro sink stopping");
+
+    try {
+      transceiver.close();
+    } catch (IOException e) {
+      logger.error(
+          "Unable to shut down avro tranceiver - Possible resource leak!", e);
+    }
+
+    super.stop();
+
+    logger.debug("Avro sink stopped. Metrics:{}", counterGroup);
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    Status status = Status.READY;
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+
+    try {
+      transaction.begin();
+
+      List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();
+
+      for (int i = 0; i < batchSize; i++) {
+        Event event = channel.take();
+
+        if (event == null) {
+          counterGroup.incrementAndGet("batch.underflow");
+          break;
+        }
+
+        AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+
+        avroEvent.body = ByteBuffer.wrap(event.getBody());
+        avroEvent.headers = new HashMap<CharSequence, CharSequence>();
+
+        for (Entry<String, String> entry : event.getHeaders().entrySet()) {
+          avroEvent.headers.put(entry.getKey(), entry.getValue());
+        }
+
+        batch.add(avroEvent);
+      }
+
+      if (batch.isEmpty()) {
+        counterGroup.incrementAndGet("batch.empty");
+        status = Status.BACKOFF;
+      } else {
+        if (!client.appendBatch(batch).equals(
+            org.apache.flume.source.avro.Status.OK)) {
+          throw new AvroRemoteException("RPC communication returned FAILED");
+        }
+      }
+
+      transaction.commit();
+      counterGroup.incrementAndGet("batch.success");
+    } catch (ChannelException e) {
+      transaction.rollback();
+      logger.error("Unable to get event from channel. Exception follows.", e);
+      status = Status.BACKOFF;
+    } catch (AvroRemoteException e) {
+      transaction.rollback();
+      logger.error("Unable to send event batch. Exception follows.", e);
+      status = Status.BACKOFF;
+    } finally {
+      transaction.close();
+    }
+
+    return status;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1181637&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Tue Oct 11 07:03:53 2011
@@ -0,0 +1,131 @@
+package org.apache.flume.sink;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAvroSink {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestAvroSink.class);
+  private static final String hostname = "localhost";
+  private static final Integer port = 41414;
+
+  private AvroSink sink;
+  private Channel channel;
+
+  @Before
+  public void setUp() {
+    sink = new AvroSink();
+    channel = new MemoryChannel();
+
+    Context context = new Context();
+
+    context.put("hostname", "localhost");
+    context.put("port", 41414);
+    context.put("batch-size", 2);
+
+    sink.setChannel(channel);
+
+    Configurables.configure(sink, context);
+    Configurables.configure(channel, context);
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException {
+    Server server = createServer();
+
+    server.start();
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+  }
+
+  @Test
+  public void testProcess() throws InterruptedException, EventDeliveryException {
+    Event event = EventBuilder.withBody("test event 1".getBytes(),
+        new HashMap<String, String>());
+    Server server = createServer();
+
+    server.start();
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      PollableSink.Status status = sink.process();
+      Assert.assertEquals(PollableSink.Status.READY, status);
+    }
+
+    Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+  }
+
+  private Server createServer() {
+    Server server = new NettyServer(new SpecificResponder(
+        AvroSourceProtocol.class, new MockAvroServer()), new InetSocketAddress(
+        hostname, port));
+
+    return server;
+  }
+
+  private static class MockAvroServer implements AvroSourceProtocol {
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.debug("Received event:{}", event);
+      return Status.OK;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events)
+        throws AvroRemoteException {
+
+      logger.debug("Received event batch:{}", events);
+
+      return Status.OK;
+    }
+
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1181637&r1=1181636&r2=1181637&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Tue Oct 11 07:03:53 2011
@@ -18,6 +18,7 @@ import org.apache.flume.lifecycle.Lifecy
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.flume.sink.AvroSink;
 import org.apache.flume.sink.DefaultSinkFactory;
 import org.apache.flume.sink.LoggerSink;
 import org.apache.flume.sink.NullSink;
@@ -80,6 +81,7 @@ public class Application {
     sinkFactory.register("logger", LoggerSink.class);
     sinkFactory.register("file-roll", RollingFileSink.class);
     sinkFactory.register("hdfs", HDFSEventSink.class);
+    sinkFactory.register("avro", AvroSink.class);
   }
 
   public void parseOptions() throws ParseException {