You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/18 05:05:33 UTC

svn commit: r1159007 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/event/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/main/java/org/apache/f...

Author: esammer
Date: Thu Aug 18 03:05:33 2011
New Revision: 1159007

URL: http://svn.apache.org/viewvc?rev=1159007&view=rev
Log:
- Switched event body back over to byte[].

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java Thu Aug 18 03:05:33 2011
@@ -121,7 +121,7 @@ public class ChannelDriverThread extends
     }
 
     while (!shouldStop) {
-      Event<?> event = null;
+      Event event = null;
 
       try {
         event = source.next(context);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java Thu Aug 18 03:05:33 2011
@@ -2,14 +2,14 @@ package org.apache.flume;
 
 import java.util.Map;
 
-public interface Event<T> {
+public interface Event {
 
   public Map<String, String> getHeaders();
 
   public void setHeaders(Map<String, String> headers);
 
-  public T getBody();
+  public byte[] getBody();
 
-  public void setBody(T body);
+  public void setBody(byte[] body);
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java Thu Aug 18 03:05:33 2011
@@ -7,8 +7,8 @@ public interface EventSink {
   public void open(Context context) throws InterruptedException,
       LifecycleException;
 
-  public void append(Context context, Event<?> event)
-      throws InterruptedException, EventDeliveryException;
+  public void append(Context context, Event event) throws InterruptedException,
+      EventDeliveryException;
 
   public void close(Context context) throws InterruptedException,
       LifecycleException;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ public interface EventSource {
   public void open(Context context) throws InterruptedException,
       LifecycleException;
 
-  public Event<?> next(Context context) throws InterruptedException,
+  public Event next(Context context) throws InterruptedException,
       EventDeliveryException;
 
   public void close(Context context) throws InterruptedException,

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java Thu Aug 18 03:05:33 2011
@@ -7,16 +7,16 @@ import org.apache.flume.Event;
 
 public class EventBuilder {
 
-  public static <T> Event<T> withBody(T body) {
-    Event<T> event = new SimpleEvent<T>();
+  public static Event withBody(byte[] body) {
+    Event event = new SimpleEvent();
 
     event.setBody(body);
 
     return event;
   }
 
-  public static <T> Event<T> withBody(T body, Map<String, String> headers) {
-    Event<T> event = new SimpleEvent<T>();
+  public static Event withBody(byte[] body, Map<String, String> headers) {
+    Event event = new SimpleEvent();
 
     event.setBody(body);
     event.setHeaders(new HashMap<String, String>(headers));

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java Thu Aug 18 03:05:33 2011
@@ -5,10 +5,10 @@ import java.util.Map;
 
 import org.apache.flume.Event;
 
-public class SimpleEvent<T> implements Event<T> {
+public class SimpleEvent implements Event {
 
   private Map<String, String> headers;
-  private T body;
+  private byte[] body;
 
   public SimpleEvent() {
     headers = new HashMap<String, String>();
@@ -26,12 +26,12 @@ public class SimpleEvent<T> implements E
   }
 
   @Override
-  public T getBody() {
+  public byte[] getBody() {
     return body;
   }
 
   @Override
-  public void setBody(T body) {
+  public void setBody(byte[] body) {
     this.body = body;
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java Thu Aug 18 03:05:33 2011
@@ -15,7 +15,7 @@ abstract public class AbstractEventSink 
   }
 
   @Override
-  abstract public void append(Context context, Event<?> event)
+  abstract public void append(Context context, Event event)
       throws InterruptedException, EventDeliveryException;
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Thu Aug 18 03:05:33 2011
@@ -12,7 +12,7 @@ public class LoggerSink extends Abstract
       .getLogger(LoggerSink.class);
 
   @Override
-  public void append(Context context, Event<?> event)
+  public void append(Context context, Event event)
       throws InterruptedException, EventDeliveryException {
 
     logger.info("event:{}", event);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
 public class NullSink extends AbstractEventSink {
 
   @Override
-  public void append(Context context, Event<?> event)
+  public void append(Context context, Event event)
       throws InterruptedException, EventDeliveryException {
 
     /* We purposefully do absolutely nothing. */

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java Thu Aug 18 03:05:33 2011
@@ -13,7 +13,7 @@ abstract public class AbstractEventSourc
   }
 
   @Override
-  abstract public Event<?> next(Context context) throws InterruptedException,
+  abstract public Event next(Context context) throws InterruptedException,
       EventDeliveryException;
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Aug 18 03:05:33 2011
@@ -50,10 +50,10 @@ public class NetcatSource extends Abstra
   }
 
   @Override
-  public Event<?> next(Context context) throws InterruptedException,
+  public Event next(Context context) throws InterruptedException,
       EventDeliveryException {
 
-    Event<?> event = null;
+    Event event = null;
 
     counterGroup.incrementAndGet("next.calls");
 
@@ -78,7 +78,7 @@ public class NetcatSource extends Abstra
 
       logger.debug("end of message");
 
-      event = EventBuilder.withBody(builder.toString());
+      event = EventBuilder.withBody(builder.toString().getBytes());
 
       channel.close();
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -10,12 +10,12 @@ public class SequenceGeneratorSource ext
   private long sequence;
 
   @Override
-  public Event<?> next(Context context) throws InterruptedException,
+  public Event next(Context context) throws InterruptedException,
       EventDeliveryException {
 
-    Event<Long> event = new SimpleEvent<Long>();
+    Event event = new SimpleEvent();
 
-    event.setBody(sequence++);
+    event.setBody(Long.valueOf(sequence++).toString().getBytes());
 
     return event;
   }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Thu Aug 18 03:05:33 2011
@@ -28,7 +28,7 @@ public class TestLoggerSink {
     sink.open(context);
 
     for (int i = 0; i < 10; i++) {
-      sink.append(context, EventBuilder.withBody("Test " + i));
+      sink.append(context, EventBuilder.withBody(("Test " + i).getBytes()));
     }
 
     sink.close(context);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -27,8 +27,8 @@ public class TestSequenceGeneratorSource
     source.open(context);
 
     for (long i = 0; i < 100; i++) {
-      Event<?> next = source.next(context);
-      long value = (Long) next.getBody();
+      Event next = source.next(context);
+      long value = Long.parseLong(new String(next.getBody()));
 
       Assert.assertEquals(i, value);
     }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java Thu Aug 18 03:05:33 2011
@@ -12,13 +12,14 @@ public class TestEventBuilder {
 
   @Test
   public void testBody() {
-    Event<String> e1 = EventBuilder.withBody("e1");
+    Event e1 = EventBuilder.withBody("e1".getBytes());
     Assert.assertNotNull(e1);
-    Assert.assertEquals("body is correct", "e1", e1.getBody());
+    Assert.assertArrayEquals("body is correct", "e1".getBytes(), e1.getBody());
 
-    Event<Long> e2 = EventBuilder.withBody(2L);
+    Event e2 = EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
     Assert.assertNotNull(e2);
-    Assert.assertEquals("body is correct", Long.valueOf(2L), e2.getBody());
+    Assert.assertArrayEquals("body is correct", Long.valueOf(2L).toString()
+        .getBytes(), e2.getBody());
   }
 
   @Test
@@ -28,10 +29,11 @@ public class TestEventBuilder {
     headers.put("one", "1");
     headers.put("two", "2");
 
-    Event<?> e1 = EventBuilder.withBody("e1", headers);
+    Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
 
     Assert.assertNotNull(e1);
-    Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
+    Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
+        e1.getBody());
     Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
     Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
   }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
 public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource {
 
   @Override
-  public Event<?> next(Context context) throws InterruptedException,
+  public Event next(Context context) throws InterruptedException,
       EventDeliveryException {
 
     if (Math.round(Math.random()) == 1) {

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Aug 18 03:05:33 2011
@@ -64,10 +64,10 @@ public class TestNetcatSource {
     for (int i = 0; i < 100; i++) {
       executor.submit(clientRequestRunnable);
 
-      Event<?> event = source.next(context);
+      Event event = source.next(context);
 
       Assert.assertNotNull(event);
-      Assert.assertEquals("Test message", event.getBody());
+      Assert.assertEquals("Test message".getBytes(), event.getBody());
     }
 
     executor.shutdown();