You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/18 05:05:33 UTC
svn commit: r1159007 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/
flume-ng-core/src/main/java/org/apache/flume/event/
flume-ng-core/src/main/java/org/apache/flume/sink/
flume-ng-core/src/main/java/org/apache/f...
Author: esammer
Date: Thu Aug 18 03:05:33 2011
New Revision: 1159007
URL: http://svn.apache.org/viewvc?rev=1159007&view=rev
Log:
- Switched event body back over to byte[].
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java Thu Aug 18 03:05:33 2011
@@ -121,7 +121,7 @@ public class ChannelDriverThread extends
}
while (!shouldStop) {
- Event<?> event = null;
+ Event event = null;
try {
event = source.next(context);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Event.java Thu Aug 18 03:05:33 2011
@@ -2,14 +2,14 @@ package org.apache.flume;
import java.util.Map;
-public interface Event<T> {
+public interface Event {
public Map<String, String> getHeaders();
public void setHeaders(Map<String, String> headers);
- public T getBody();
+ public byte[] getBody();
- public void setBody(T body);
+ public void setBody(byte[] body);
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSink.java Thu Aug 18 03:05:33 2011
@@ -7,8 +7,8 @@ public interface EventSink {
public void open(Context context) throws InterruptedException,
LifecycleException;
- public void append(Context context, Event<?> event)
- throws InterruptedException, EventDeliveryException;
+ public void append(Context context, Event event) throws InterruptedException,
+ EventDeliveryException;
public void close(Context context) throws InterruptedException,
LifecycleException;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/EventSource.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ public interface EventSource {
public void open(Context context) throws InterruptedException,
LifecycleException;
- public Event<?> next(Context context) throws InterruptedException,
+ public Event next(Context context) throws InterruptedException,
EventDeliveryException;
public void close(Context context) throws InterruptedException,
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java Thu Aug 18 03:05:33 2011
@@ -7,16 +7,16 @@ import org.apache.flume.Event;
public class EventBuilder {
- public static <T> Event<T> withBody(T body) {
- Event<T> event = new SimpleEvent<T>();
+ public static Event withBody(byte[] body) {
+ Event event = new SimpleEvent();
event.setBody(body);
return event;
}
- public static <T> Event<T> withBody(T body, Map<String, String> headers) {
- Event<T> event = new SimpleEvent<T>();
+ public static Event withBody(byte[] body, Map<String, String> headers) {
+ Event event = new SimpleEvent();
event.setBody(body);
event.setHeaders(new HashMap<String, String>(headers));
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java Thu Aug 18 03:05:33 2011
@@ -5,10 +5,10 @@ import java.util.Map;
import org.apache.flume.Event;
-public class SimpleEvent<T> implements Event<T> {
+public class SimpleEvent implements Event {
private Map<String, String> headers;
- private T body;
+ private byte[] body;
public SimpleEvent() {
headers = new HashMap<String, String>();
@@ -26,12 +26,12 @@ public class SimpleEvent<T> implements E
}
@Override
- public T getBody() {
+ public byte[] getBody() {
return body;
}
@Override
- public void setBody(T body) {
+ public void setBody(byte[] body) {
this.body = body;
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java Thu Aug 18 03:05:33 2011
@@ -15,7 +15,7 @@ abstract public class AbstractEventSink
}
@Override
- abstract public void append(Context context, Event<?> event)
+ abstract public void append(Context context, Event event)
throws InterruptedException, EventDeliveryException;
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Thu Aug 18 03:05:33 2011
@@ -12,7 +12,7 @@ public class LoggerSink extends Abstract
.getLogger(LoggerSink.class);
@Override
- public void append(Context context, Event<?> event)
+ public void append(Context context, Event event)
throws InterruptedException, EventDeliveryException {
logger.info("event:{}", event);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
public class NullSink extends AbstractEventSink {
@Override
- public void append(Context context, Event<?> event)
+ public void append(Context context, Event event)
throws InterruptedException, EventDeliveryException {
/* We purposefully do absolutely nothing. */
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java Thu Aug 18 03:05:33 2011
@@ -13,7 +13,7 @@ abstract public class AbstractEventSourc
}
@Override
- abstract public Event<?> next(Context context) throws InterruptedException,
+ abstract public Event next(Context context) throws InterruptedException,
EventDeliveryException;
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Aug 18 03:05:33 2011
@@ -50,10 +50,10 @@ public class NetcatSource extends Abstra
}
@Override
- public Event<?> next(Context context) throws InterruptedException,
+ public Event next(Context context) throws InterruptedException,
EventDeliveryException {
- Event<?> event = null;
+ Event event = null;
counterGroup.incrementAndGet("next.calls");
@@ -78,7 +78,7 @@ public class NetcatSource extends Abstra
logger.debug("end of message");
- event = EventBuilder.withBody(builder.toString());
+ event = EventBuilder.withBody(builder.toString().getBytes());
channel.close();
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -10,12 +10,12 @@ public class SequenceGeneratorSource ext
private long sequence;
@Override
- public Event<?> next(Context context) throws InterruptedException,
+ public Event next(Context context) throws InterruptedException,
EventDeliveryException {
- Event<Long> event = new SimpleEvent<Long>();
+ Event event = new SimpleEvent();
- event.setBody(sequence++);
+ event.setBody(Long.valueOf(sequence++).toString().getBytes());
return event;
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Thu Aug 18 03:05:33 2011
@@ -28,7 +28,7 @@ public class TestLoggerSink {
sink.open(context);
for (int i = 0; i < 10; i++) {
- sink.append(context, EventBuilder.withBody("Test " + i));
+ sink.append(context, EventBuilder.withBody(("Test " + i).getBytes()));
}
sink.close(context);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -27,8 +27,8 @@ public class TestSequenceGeneratorSource
source.open(context);
for (long i = 0; i < 100; i++) {
- Event<?> next = source.next(context);
- long value = (Long) next.getBody();
+ Event next = source.next(context);
+ long value = Long.parseLong(new String(next.getBody()));
Assert.assertEquals(i, value);
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java Thu Aug 18 03:05:33 2011
@@ -12,13 +12,14 @@ public class TestEventBuilder {
@Test
public void testBody() {
- Event<String> e1 = EventBuilder.withBody("e1");
+ Event e1 = EventBuilder.withBody("e1".getBytes());
Assert.assertNotNull(e1);
- Assert.assertEquals("body is correct", "e1", e1.getBody());
+ Assert.assertArrayEquals("body is correct", "e1".getBytes(), e1.getBody());
- Event<Long> e2 = EventBuilder.withBody(2L);
+ Event e2 = EventBuilder.withBody(Long.valueOf(2).toString().getBytes());
Assert.assertNotNull(e2);
- Assert.assertEquals("body is correct", Long.valueOf(2L), e2.getBody());
+ Assert.assertArrayEquals("body is correct", Long.valueOf(2L).toString()
+ .getBytes(), e2.getBody());
}
@Test
@@ -28,10 +29,11 @@ public class TestEventBuilder {
headers.put("one", "1");
headers.put("two", "2");
- Event<?> e1 = EventBuilder.withBody("e1", headers);
+ Event e1 = EventBuilder.withBody("e1".getBytes(), headers);
Assert.assertNotNull(e1);
- Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
+ Assert.assertArrayEquals("e1 has the proper body", "e1".getBytes(),
+ e1.getBody());
Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java Thu Aug 18 03:05:33 2011
@@ -7,7 +7,7 @@ import org.apache.flume.EventDeliveryExc
public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource {
@Override
- public Event<?> next(Context context) throws InterruptedException,
+ public Event next(Context context) throws InterruptedException,
EventDeliveryException {
if (Math.round(Math.random()) == 1) {
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1159007&r1=1159006&r2=1159007&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Aug 18 03:05:33 2011
@@ -64,10 +64,10 @@ public class TestNetcatSource {
for (int i = 0; i < 100; i++) {
executor.submit(clientRequestRunnable);
- Event<?> event = source.next(context);
+ Event event = source.next(context);
Assert.assertNotNull(event);
- Assert.assertEquals("Test message", event.getBody());
+ Assert.assertEquals("Test message".getBytes(), event.getBody());
}
executor.shutdown();