You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/11 09:03:54 UTC
svn commit: r1181637 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/sink/
flume-ng-core/src/test/java/org/apache/flume/sink/
flume-ng-node/src/main/java/org/apache/flume/node/
Author: esammer
Date: Tue Oct 11 07:03:53 2011
New Revision: 1181637
URL: http://svn.apache.org/viewvc?rev=1181637&view=rev
Log:
FLUME-778: Implement NG Avro sink
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1181637&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Tue Oct 11 07:03:53 2011
@@ -0,0 +1,170 @@
+package org.apache.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class AvroSink extends AbstractSink implements PollableSink,
+ Configurable {
+
+ private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
+ private static final Integer defaultBatchSize = 100;
+
+ private String hostname;
+ private Integer port;
+ private Integer batchSize;
+
+ private AvroSourceProtocol client;
+ private Transceiver transceiver;
+ private CounterGroup counterGroup;
+
+ public AvroSink() {
+ counterGroup = new CounterGroup();
+ }
+
+ @Override
+ public void configure(Context context) {
+ hostname = context.get("hostname", String.class);
+ port = context.get("port", Integer.class);
+ batchSize = context.get("batch-size", Integer.class);
+
+ if (batchSize == null) {
+ batchSize = defaultBatchSize;
+ }
+
+ Preconditions.checkState(hostname != null, "No hostname specified");
+ Preconditions.checkState(port != null, "No port specified");
+ }
+
+ @Override
+ public void start() {
+ logger.info("Avro sink starting");
+
+ try {
+ transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
+ client = SpecificRequestor.getClient(AvroSourceProtocol.class,
+ transceiver);
+ } catch (Exception e) {
+ logger.error("Unable to create avro client using hostname:" + hostname
+ + " port:" + port + ". Exception follows.", e);
+
+ /* Try to prevent leaking resources. */
+ if (transceiver != null) {
+ try {
+ transceiver.close();
+ } catch (IOException e1) {
+ logger
+ .error(
+ "Attempt to clean up avro tranceiver after client error failed. Exception follows.",
+ e1);
+ }
+ }
+
+ /* FIXME: Mark ourselves as failed. */
+ return;
+ }
+
+ super.start();
+
+ logger.debug("Avro sink started");
+ }
+
+ @Override
+ public void stop() {
+ logger.info("Avro sink stopping");
+
+ try {
+ transceiver.close();
+ } catch (IOException e) {
+ logger.error(
+ "Unable to shut down avro tranceiver - Possible resource leak!", e);
+ }
+
+ super.stop();
+
+ logger.debug("Avro sink stopped. Metrics:{}", counterGroup);
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ Status status = Status.READY;
+ Channel channel = getChannel();
+ Transaction transaction = channel.getTransaction();
+
+ try {
+ transaction.begin();
+
+ List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();
+
+ for (int i = 0; i < batchSize; i++) {
+ Event event = channel.take();
+
+ if (event == null) {
+ counterGroup.incrementAndGet("batch.underflow");
+ break;
+ }
+
+ AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+
+ avroEvent.body = ByteBuffer.wrap(event.getBody());
+ avroEvent.headers = new HashMap<CharSequence, CharSequence>();
+
+ for (Entry<String, String> entry : event.getHeaders().entrySet()) {
+ avroEvent.headers.put(entry.getKey(), entry.getValue());
+ }
+
+ batch.add(avroEvent);
+ }
+
+ if (batch.isEmpty()) {
+ counterGroup.incrementAndGet("batch.empty");
+ status = Status.BACKOFF;
+ } else {
+ if (!client.appendBatch(batch).equals(
+ org.apache.flume.source.avro.Status.OK)) {
+ throw new AvroRemoteException("RPC communication returned FAILED");
+ }
+ }
+
+ transaction.commit();
+ counterGroup.incrementAndGet("batch.success");
+ } catch (ChannelException e) {
+ transaction.rollback();
+ logger.error("Unable to get event from channel. Exception follows.", e);
+ status = Status.BACKOFF;
+ } catch (AvroRemoteException e) {
+ transaction.rollback();
+ logger.error("Unable to send event batch. Exception follows.", e);
+ status = Status.BACKOFF;
+ } finally {
+ transaction.close();
+ }
+
+ return status;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1181637&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Tue Oct 11 07:03:53 2011
@@ -0,0 +1,131 @@
+package org.apache.flume.sink;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAvroSink {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestAvroSink.class);
+ private static final String hostname = "localhost";
+ private static final Integer port = 41414;
+
+ private AvroSink sink;
+ private Channel channel;
+
+ @Before
+ public void setUp() {
+ sink = new AvroSink();
+ channel = new MemoryChannel();
+
+ Context context = new Context();
+
+ context.put("hostname", "localhost");
+ context.put("port", 41414);
+ context.put("batch-size", 2);
+
+ sink.setChannel(channel);
+
+ Configurables.configure(sink, context);
+ Configurables.configure(channel, context);
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException {
+ Server server = createServer();
+
+ server.start();
+
+ sink.start();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ sink.stop();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.STOP_OR_ERROR, 5000));
+
+ server.close();
+ }
+
+ @Test
+ public void testProcess() throws InterruptedException, EventDeliveryException {
+ Event event = EventBuilder.withBody("test event 1".getBytes(),
+ new HashMap<String, String>());
+ Server server = createServer();
+
+ server.start();
+
+ sink.start();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ for (int i = 0; i < 10; i++) {
+ channel.put(event);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ PollableSink.Status status = sink.process();
+ Assert.assertEquals(PollableSink.Status.READY, status);
+ }
+
+ Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+
+ sink.stop();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.STOP_OR_ERROR, 5000));
+
+ server.close();
+ }
+
+ private Server createServer() {
+ Server server = new NettyServer(new SpecificResponder(
+ AvroSourceProtocol.class, new MockAvroServer()), new InetSocketAddress(
+ hostname, port));
+
+ return server;
+ }
+
+ private static class MockAvroServer implements AvroSourceProtocol {
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.debug("Received event:{}", event);
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events)
+ throws AvroRemoteException {
+
+ logger.debug("Received event batch:{}", events);
+
+ return Status.OK;
+ }
+
+ }
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1181637&r1=1181636&r2=1181637&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Tue Oct 11 07:03:53 2011
@@ -18,6 +18,7 @@ import org.apache.flume.lifecycle.Lifecy
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.flume.sink.AvroSink;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.flume.sink.LoggerSink;
import org.apache.flume.sink.NullSink;
@@ -80,6 +81,7 @@ public class Application {
sinkFactory.register("logger", LoggerSink.class);
sinkFactory.register("file-roll", RollingFileSink.class);
sinkFactory.register("hdfs", HDFSEventSink.class);
+ sinkFactory.register("avro", AvroSink.class);
}
public void parseOptions() throws ParseException {