You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Jonathan Hsieh <jo...@cloudera.com> on 2011/08/18 19:21:10 UTC
Use ByteBuffer? Re: svn commit: r1159007 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/event/
flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/
I've been scanning a few of the high level ones things -- will drop some
suggestions/thoughts here and there.
Have you considered using ByteBuffer's instead of byte[]'s. This was
suggested way back when Flume started -- at I chose against this because
byte[]'s were easier to understand. If chosen in this go-round you could
just use the simple ByteBuffer.allocate() initially. Later on this would
enable less traumatic changes for efficient serializations of data,
network/disk data copies, etc.
Jon.
On Wed, Aug 17, 2011 at 8:05 PM, <es...@apache.org> wrote:
> Author: esammer
> Date: Thu Aug 18 03:05:33 2011
> New Revision: 1159007
>
> URL: http://svn.apache.org/viewvc?rev=1159007&view=rev
> Log:
> - Switched event body back over to byte[].
>
> Modified:
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
>
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
>
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> Thu Aug 18 03:05:33 2011
> @@ -121,7 +121,7 @@ public class ChannelDriverThread extends
> }
>
> while (!shouldStop) {
> - Event<?> event = null;
> + Event event = null;
>
> try {
> event = source.next(context);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> Thu Aug 18 03:05:33 2011
> @@ -2,14 +2,14 @@ package org.apache.flume;
>
> import java.util.Map;
>
> -public interface Event<T> {
> +public interface Event {
>
> public Map<String, String> getHeaders();
>
> public void setHeaders(Map<String, String> headers);
>
> - public T getBody();
> + public byte[] getBody();
>
> - public void setBody(T body);
> + public void setBody(byte[] body);
>
> }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> Thu Aug 18 03:05:33 2011
> @@ -7,8 +7,8 @@ public interface EventSink {
> public void open(Context context) throws InterruptedException,
> LifecycleException;
>
> - public void append(Context context, Event<?> event)
> - throws InterruptedException, EventDeliveryException;
> + public void append(Context context, Event event) throws
> InterruptedException,
> + EventDeliveryException;
>
> public void close(Context context) throws InterruptedException,
> LifecycleException;
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ public interface EventSource {
> public void open(Context context) throws InterruptedException,
> LifecycleException;
>
> - public Event<?> next(Context context) throws InterruptedException,
> + public Event next(Context context) throws InterruptedException,
> EventDeliveryException;
>
> public void close(Context context) throws InterruptedException,
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> Thu Aug 18 03:05:33 2011
> @@ -7,16 +7,16 @@ import org.apache.flume.Event;
>
> public class EventBuilder {
>
> - public static <T> Event<T> withBody(T body) {
> - Event<T> event = new SimpleEvent<T>();
> + public static Event withBody(byte[] body) {
> + Event event = new SimpleEvent();
>
> event.setBody(body);
>
> return event;
> }
>
> - public static <T> Event<T> withBody(T body, Map<String, String> headers)
> {
> - Event<T> event = new SimpleEvent<T>();
> + public static Event withBody(byte[] body, Map<String, String> headers) {
> + Event event = new SimpleEvent();
>
> event.setBody(body);
> event.setHeaders(new HashMap<String, String>(headers));
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> Thu Aug 18 03:05:33 2011
> @@ -5,10 +5,10 @@ import java.util.Map;
>
> import org.apache.flume.Event;
>
> -public class SimpleEvent<T> implements Event<T> {
> +public class SimpleEvent implements Event {
>
> private Map<String, String> headers;
> - private T body;
> + private byte[] body;
>
> public SimpleEvent() {
> headers = new HashMap<String, String>();
> @@ -26,12 +26,12 @@ public class SimpleEvent<T> implements E
> }
>
> @Override
> - public T getBody() {
> + public byte[] getBody() {
> return body;
> }
>
> @Override
> - public void setBody(T body) {
> + public void setBody(byte[] body) {
> this.body = body;
> }
>
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> Thu Aug 18 03:05:33 2011
> @@ -15,7 +15,7 @@ abstract public class AbstractEventSink
> }
>
> @Override
> - abstract public void append(Context context, Event<?> event)
> + abstract public void append(Context context, Event event)
> throws InterruptedException, EventDeliveryException;
>
> @Override
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> Thu Aug 18 03:05:33 2011
> @@ -12,7 +12,7 @@ public class LoggerSink extends Abstract
> .getLogger(LoggerSink.class);
>
> @Override
> - public void append(Context context, Event<?> event)
> + public void append(Context context, Event event)
> throws InterruptedException, EventDeliveryException {
>
> logger.info("event:{}", event);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
> public class NullSink extends AbstractEventSink {
>
> @Override
> - public void append(Context context, Event<?> event)
> + public void append(Context context, Event event)
> throws InterruptedException, EventDeliveryException {
>
> /* We purposefully do absolutely nothing. */
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> Thu Aug 18 03:05:33 2011
> @@ -13,7 +13,7 @@ abstract public class AbstractEventSourc
> }
>
> @Override
> - abstract public Event<?> next(Context context) throws
> InterruptedException,
> + abstract public Event next(Context context) throws InterruptedException,
> EventDeliveryException;
>
> @Override
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> Thu Aug 18 03:05:33 2011
> @@ -50,10 +50,10 @@ public class NetcatSource extends Abstra
> }
>
> @Override
> - public Event<?> next(Context context) throws InterruptedException,
> + public Event next(Context context) throws InterruptedException,
> EventDeliveryException {
>
> - Event<?> event = null;
> + Event event = null;
>
> counterGroup.incrementAndGet("next.calls");
>
> @@ -78,7 +78,7 @@ public class NetcatSource extends Abstra
>
> logger.debug("end of message");
>
> - event = EventBuilder.withBody(builder.toString());
> + event = EventBuilder.withBody(builder.toString().getBytes());
>
> channel.close();
>
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -10,12 +10,12 @@ public class SequenceGeneratorSource ext
> private long sequence;
>
> @Override
> - public Event<?> next(Context context) throws InterruptedException,
> + public Event next(Context context) throws InterruptedException,
> EventDeliveryException {
>
> - Event<Long> event = new SimpleEvent<Long>();
> + Event event = new SimpleEvent();
>
> - event.setBody(sequence++);
> + event.setBody(Long.valueOf(sequence++).toString().getBytes());
>
> return event;
> }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> Thu Aug 18 03:05:33 2011
> @@ -28,7 +28,7 @@ public class TestLoggerSink {
> sink.open(context);
>
> for (int i = 0; i < 10; i++) {
> - sink.append(context, EventBuilder.withBody("Test " + i));
> + sink.append(context, EventBuilder.withBody(("Test " +
> i).getBytes()));
> }
>
> sink.close(context);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -27,8 +27,8 @@ public class TestSequenceGeneratorSource
> source.open(context);
>
> for (long i = 0; i < 100; i++) {
> - Event<?> next = source.next(context);
> - long value = (Long) next.getBody();
> + Event next = source.next(context);
> + long value = Long.parseLong(new String(next.getBody()));
>
> Assert.assertEquals(i, value);
> }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> Thu Aug 18 03:05:33 2011
> @@ -12,13 +12,14 @@ public class TestEventBuilder {
>
> @Test
> public void testBody() {
> - Event<String> e1 = EventBuilder.withBody("e1");
> + Event e1 = EventBuilder.withBody("e1".getBytes());
> Assert.assertNotNull(e1);
> - Assert.assertEquals("body is correct", "e1", e1.getBody());
> + Assert.assertArrayEquals("body is correct", "e1".getBytes(),
> e1.getBody());
>
> - Event<Long> e2 = EventBuilder.withBody(2L);
> + Event e2 =
> EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
> Assert.assertNotNull(e2);
> - Assert.assertEquals("body is correct", Long.valueOf(2L),
> e2.getBody());
> + Assert.assertArrayEquals("body is correct",
> Long.valueOf(2L).toString()
> + .getBytes(), e2.getBody());
> }
>
> @Test
> @@ -28,10 +29,11 @@ public class TestEventBuilder {
> headers.put("one", "1");
> headers.put("two", "2");
>
> - Event<?> e1 = EventBuilder.withBody("e1", headers);
> + Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
>
> Assert.assertNotNull(e1);
> - Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
> + Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
> + e1.getBody());
> Assert.assertEquals("e1 has the proper headers", 2,
> e1.getHeaders().size());
> Assert.assertEquals("e1 has a one key", "1",
> e1.getHeaders().get("one"));
> }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
> public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource
> {
>
> @Override
> - public Event<?> next(Context context) throws InterruptedException,
> + public Event next(Context context) throws InterruptedException,
> EventDeliveryException {
>
> if (Math.round(Math.random()) == 1) {
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> Thu Aug 18 03:05:33 2011
> @@ -64,10 +64,10 @@ public class TestNetcatSource {
> for (int i = 0; i < 100; i++) {
> executor.submit(clientRequestRunnable);
>
> - Event<?> event = source.next(context);
> + Event event = source.next(context);
>
> Assert.assertNotNull(event);
> - Assert.assertEquals("Test message", event.getBody());
> + Assert.assertEquals("Test message".getBytes(), event.getBody());
> }
>
> executor.shutdown();
>
>
>
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com