You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/05/29 21:15:33 UTC
svn commit: r1343934 - in
/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src:
main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
Author: mpercy
Date: Tue May 29 19:15:32 2012
New Revision: 1343934
URL: http://svn.apache.org/viewvc?rev=1343934&view=rev
Log:
FLUME-1221. Thrift Legacy Source improperly converts Flume 0.9 event headers to 1.x format.
(Joe Crobak via Mike Percy)
Modified:
incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java?rev=1343934&r1=1343933&r2=1343934&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java Tue May 29 19:15:32 2012
@@ -22,6 +22,7 @@ package org.apache.flume.source.thriftLe
import java.lang.InterruptedException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -64,6 +65,9 @@ public class ThriftLegacySource extends
private TServerTransport serverTransport;
private Thread thriftHandlerThread;
+ // Charset#decode is threadsafe.
+ private Charset UTF_8 = Charset.forName("UTF-8");
+
@SuppressWarnings("deprecation")
private class ThriftFlumeEventServerImpl
implements ThriftFlumeEventServer.Iface {
@@ -80,7 +84,8 @@ public class ThriftLegacySource extends
headers.put(PRIORITY, evt.getPriority().toString());
headers.put(NANOS, Long.toString(evt.getNanos()));
for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
- headers.put(entry.getKey().toString(), entry.getValue().toString());
+ headers.put(entry.getKey().toString(),
+ UTF_8.decode(entry.getValue()).toString());
}
headers.put(OG_EVENT, "yes");
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java?rev=1343934&r1=1343933&r2=1343934&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java Tue May 29 19:15:32 2012
@@ -107,8 +107,7 @@ public class TestThriftLegacySource {
source.setChannelProcessor(new ChannelProcessor(rcs));
}
- @Test
- public void testLifecycle() throws InterruptedException {
+ private void bind() throws InterruptedException {
boolean bound = false;
for (int i = 0; i < 100 && !bound; i++) {
@@ -131,8 +130,10 @@ public class TestThriftLegacySource {
.assertTrue("Reached start or error", LifecycleController.waitForOneOf(
source, LifecycleState.START_OR_ERROR));
Assert.assertEquals("Server is started", LifecycleState.START,
- source.getLifecycleState());
+ source.getLifecycleState());
+ }
+ private void stop() throws InterruptedException {
source.stop();
Assert.assertTrue("Reached stop or error",
LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
@@ -141,31 +142,14 @@ public class TestThriftLegacySource {
}
@Test
- public void testRequest() throws InterruptedException, IOException {
- boolean bound = false;
- int i;
-
- for (i = 0; i < 100 && !bound; i++) {
- try {
- Context context = new Context();
-
- context.put("port", String.valueOf(selectedPort = 41414 + i));
- context.put("host", "0.0.0.0");
-
- Configurables.configure(source, context);
-
- source.start();
- bound = true;
- } catch (ChannelException e) {
- // Assume port in use, try another one
- }
- }
+ public void testLifecycle() throws InterruptedException {
+ bind();
+ stop();
+ }
- Assert
- .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
- source, LifecycleState.START_OR_ERROR));
- Assert.assertEquals("Server is started", LifecycleState.START,
- source.getLifecycleState());
+ @Test
+ public void testRequest() throws InterruptedException, IOException {
+ bind();
Map flumeMap = new HashMap<CharSequence, ByteBuffer>();
ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(
@@ -185,12 +169,33 @@ public class TestThriftLegacySource {
transaction.commit();
transaction.close();
- source.stop();
+ stop();
+ }
- Assert.assertTrue("Reached stop or error",
- LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
- Assert.assertEquals("Server is stopped", LifecycleState.STOP,
- source.getLifecycleState());
+ @Test
+ public void testHeaders() throws InterruptedException, IOException {
+ bind();
+
+ Map flumeHeaders = new HashMap<CharSequence, ByteBuffer>();
+ flumeHeaders.put("hello", ByteBuffer.wrap("world".getBytes("UTF-8")));
+ ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(
+ 1, Priority.INFO, ByteBuffer.wrap("foo".getBytes()),
+ 0, "fooHost", flumeHeaders);
+ FlumeClient fClient = new FlumeClient("0.0.0.0", selectedPort);
+ fClient.append(thriftEvent);
+
+ // check if the event has arrived in the channel through OG thrift source
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertEquals("Event in channel has our header", "world",
+ event.getHeaders().get("hello"));
+ transaction.commit();
+ transaction.close();
+
+ stop();
}
}