You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Jonathan Hsieh <jo...@cloudera.com> on 2011/08/18 19:21:10 UTC

Use ByteBuffer? Re: svn commit: r1159007 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/event/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/

I've been scanning a few of the high level ones things -- will drop some
suggestions/thoughts here and there.

Have you considered using ByteBuffer's instead of byte[]'s.  This was
suggested way back when Flume started -- at I chose against this because
byte[]'s were easier to understand.  If chosen in this go-round you could
just use the simple ByteBuffer.allocate() initially.  Later on this would
enable less traumatic changes for efficient serializations of data,
network/disk data copies, etc.

Jon.

On Wed, Aug 17, 2011 at 8:05 PM, <es...@apache.org> wrote:

> Author: esammer
> Date: Thu Aug 18 03:05:33 2011
> New Revision: 1159007
>
> URL: http://svn.apache.org/viewvc?rev=1159007&view=rev
> Log:
> - Switched event body back over to byte[].
>
> Modified:
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
>
>  incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
>
>  incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
> Thu Aug 18 03:05:33 2011
> @@ -121,7 +121,7 @@ public class ChannelDriverThread extends
>     }
>
>     while (!shouldStop) {
> -      Event<?> event = null;
> +      Event event = null;
>
>       try {
>         event = source.next(context);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
> Thu Aug 18 03:05:33 2011
> @@ -2,14 +2,14 @@ package org.apache.flume;
>
>  import java.util.Map;
>
> -public interface Event<T> {
> +public interface Event {
>
>   public Map<String, String> getHeaders();
>
>   public void setHeaders(Map<String, String> headers);
>
> -  public T getBody();
> +  public byte[] getBody();
>
> -  public void setBody(T body);
> +  public void setBody(byte[] body);
>
>  }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
> Thu Aug 18 03:05:33 2011
> @@ -7,8 +7,8 @@ public interface EventSink {
>   public void open(Context context) throws InterruptedException,
>       LifecycleException;
>
> -  public void append(Context context, Event<?> event)
> -      throws InterruptedException, EventDeliveryException;
> +  public void append(Context context, Event event) throws
> InterruptedException,
> +      EventDeliveryException;
>
>   public void close(Context context) throws InterruptedException,
>       LifecycleException;
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ public interface EventSource {
>   public void open(Context context) throws InterruptedException,
>       LifecycleException;
>
> -  public Event<?> next(Context context) throws InterruptedException,
> +  public Event next(Context context) throws InterruptedException,
>       EventDeliveryException;
>
>   public void close(Context context) throws InterruptedException,
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
> Thu Aug 18 03:05:33 2011
> @@ -7,16 +7,16 @@ import org.apache.flume.Event;
>
>  public class EventBuilder {
>
> -  public static <T> Event<T> withBody(T body) {
> -    Event<T> event = new SimpleEvent<T>();
> +  public static Event withBody(byte[] body) {
> +    Event event = new SimpleEvent();
>
>     event.setBody(body);
>
>     return event;
>   }
>
> -  public static <T> Event<T> withBody(T body, Map<String, String> headers)
> {
> -    Event<T> event = new SimpleEvent<T>();
> +  public static Event withBody(byte[] body, Map<String, String> headers) {
> +    Event event = new SimpleEvent();
>
>     event.setBody(body);
>     event.setHeaders(new HashMap<String, String>(headers));
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
> Thu Aug 18 03:05:33 2011
> @@ -5,10 +5,10 @@ import java.util.Map;
>
>  import org.apache.flume.Event;
>
> -public class SimpleEvent<T> implements Event<T> {
> +public class SimpleEvent implements Event {
>
>   private Map<String, String> headers;
> -  private T body;
> +  private byte[] body;
>
>   public SimpleEvent() {
>     headers = new HashMap<String, String>();
> @@ -26,12 +26,12 @@ public class SimpleEvent<T> implements E
>   }
>
>   @Override
> -  public T getBody() {
> +  public byte[] getBody() {
>     return body;
>   }
>
>   @Override
> -  public void setBody(T body) {
> +  public void setBody(byte[] body) {
>     this.body = body;
>   }
>
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
> Thu Aug 18 03:05:33 2011
> @@ -15,7 +15,7 @@ abstract public class AbstractEventSink
>   }
>
>   @Override
> -  abstract public void append(Context context, Event<?> event)
> +  abstract public void append(Context context, Event event)
>       throws InterruptedException, EventDeliveryException;
>
>   @Override
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
> Thu Aug 18 03:05:33 2011
> @@ -12,7 +12,7 @@ public class LoggerSink extends Abstract
>       .getLogger(LoggerSink.class);
>
>   @Override
> -  public void append(Context context, Event<?> event)
> +  public void append(Context context, Event event)
>       throws InterruptedException, EventDeliveryException {
>
>     logger.info("event:{}", event);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
>  public class NullSink extends AbstractEventSink {
>
>   @Override
> -  public void append(Context context, Event<?> event)
> +  public void append(Context context, Event event)
>       throws InterruptedException, EventDeliveryException {
>
>     /* We purposefully do absolutely nothing. */
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
> Thu Aug 18 03:05:33 2011
> @@ -13,7 +13,7 @@ abstract public class AbstractEventSourc
>   }
>
>   @Override
> -  abstract public Event<?> next(Context context) throws
> InterruptedException,
> +  abstract public Event next(Context context) throws InterruptedException,
>       EventDeliveryException;
>
>   @Override
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
> Thu Aug 18 03:05:33 2011
> @@ -50,10 +50,10 @@ public class NetcatSource extends Abstra
>   }
>
>   @Override
> -  public Event<?> next(Context context) throws InterruptedException,
> +  public Event next(Context context) throws InterruptedException,
>       EventDeliveryException {
>
> -    Event<?> event = null;
> +    Event event = null;
>
>     counterGroup.incrementAndGet("next.calls");
>
> @@ -78,7 +78,7 @@ public class NetcatSource extends Abstra
>
>       logger.debug("end of message");
>
> -      event = EventBuilder.withBody(builder.toString());
> +      event = EventBuilder.withBody(builder.toString().getBytes());
>
>       channel.close();
>
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -10,12 +10,12 @@ public class SequenceGeneratorSource ext
>   private long sequence;
>
>   @Override
> -  public Event<?> next(Context context) throws InterruptedException,
> +  public Event next(Context context) throws InterruptedException,
>       EventDeliveryException {
>
> -    Event<Long> event = new SimpleEvent<Long>();
> +    Event event = new SimpleEvent();
>
> -    event.setBody(sequence++);
> +    event.setBody(Long.valueOf(sequence++).toString().getBytes());
>
>     return event;
>   }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
> Thu Aug 18 03:05:33 2011
> @@ -28,7 +28,7 @@ public class TestLoggerSink {
>     sink.open(context);
>
>     for (int i = 0; i < 10; i++) {
> -      sink.append(context, EventBuilder.withBody("Test " + i));
> +      sink.append(context, EventBuilder.withBody(("Test " +
> i).getBytes()));
>     }
>
>     sink.close(context);
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -27,8 +27,8 @@ public class TestSequenceGeneratorSource
>     source.open(context);
>
>     for (long i = 0; i < 100; i++) {
> -      Event<?> next = source.next(context);
> -      long value = (Long) next.getBody();
> +      Event next = source.next(context);
> +      long value = Long.parseLong(new String(next.getBody()));
>
>       Assert.assertEquals(i, value);
>     }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
> Thu Aug 18 03:05:33 2011
> @@ -12,13 +12,14 @@ public class TestEventBuilder {
>
>   @Test
>   public void testBody() {
> -    Event<String> e1 = EventBuilder.withBody("e1");
> +    Event e1 = EventBuilder.withBody("e1".getBytes());
>     Assert.assertNotNull(e1);
> -    Assert.assertEquals("body is correct", "e1", e1.getBody());
> +    Assert.assertArrayEquals("body is correct", "e1".getBytes(),
> e1.getBody());
>
> -    Event<Long> e2 = EventBuilder.withBody(2L);
> +    Event e2 =
> EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
>     Assert.assertNotNull(e2);
> -    Assert.assertEquals("body is correct", Long.valueOf(2L),
> e2.getBody());
> +    Assert.assertArrayEquals("body is correct",
> Long.valueOf(2L).toString()
> +        .getBytes(), e2.getBody());
>   }
>
>   @Test
> @@ -28,10 +29,11 @@ public class TestEventBuilder {
>     headers.put("one", "1");
>     headers.put("two", "2");
>
> -    Event<?> e1 = EventBuilder.withBody("e1", headers);
> +    Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
>
>     Assert.assertNotNull(e1);
> -    Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
> +    Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
> +        e1.getBody());
>     Assert.assertEquals("e1 has the proper headers", 2,
> e1.getHeaders().size());
>     Assert.assertEquals("e1 has a one key", "1",
> e1.getHeaders().get("one"));
>   }
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
> Thu Aug 18 03:05:33 2011
> @@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
>  public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource
> {
>
>   @Override
> -  public Event<?> next(Context context) throws InterruptedException,
> +  public Event next(Context context) throws InterruptedException,
>       EventDeliveryException {
>
>     if (Math.round(Math.random()) == 1) {
>
> Modified:
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> (original)
> +++
> incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
> Thu Aug 18 03:05:33 2011
> @@ -64,10 +64,10 @@ public class TestNetcatSource {
>     for (int i = 0; i < 100; i++) {
>       executor.submit(clientRequestRunnable);
>
> -      Event<?> event = source.next(context);
> +      Event event = source.next(context);
>
>       Assert.assertNotNull(event);
> -      Assert.assertEquals("Test message", event.getBody());
> +      Assert.assertEquals("Test message".getBytes(), event.getBody());
>     }
>
>     executor.shutdown();
>
>
>


-- 
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com