You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/05/29 21:15:33 UTC

svn commit: r1343934 - in /incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src: main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java

Author: mpercy
Date: Tue May 29 19:15:32 2012
New Revision: 1343934

URL: http://svn.apache.org/viewvc?rev=1343934&view=rev
Log:
FLUME-1221. Thrift Legacy Source improperly converts Flume 0.9 event headers to 1.x format.

(Joe Crobak via Mike Percy)

Modified:
    incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
    incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java?rev=1343934&r1=1343933&r2=1343934&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java Tue May 29 19:15:32 2012
@@ -22,6 +22,7 @@ package org.apache.flume.source.thriftLe
 import java.lang.InterruptedException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -64,6 +65,9 @@ public class ThriftLegacySource  extends
   private TServerTransport serverTransport;
   private Thread thriftHandlerThread;
 
+  // Charset#decode is threadsafe.
+  private Charset UTF_8 = Charset.forName("UTF-8");
+
   @SuppressWarnings("deprecation")
   private class ThriftFlumeEventServerImpl
         implements ThriftFlumeEventServer.Iface {
@@ -80,7 +84,8 @@ public class ThriftLegacySource  extends
       headers.put(PRIORITY, evt.getPriority().toString());
       headers.put(NANOS, Long.toString(evt.getNanos()));
       for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
-        headers.put(entry.getKey().toString(), entry.getValue().toString());
+        headers.put(entry.getKey().toString(),
+          UTF_8.decode(entry.getValue()).toString());
       }
       headers.put(OG_EVENT, "yes");
 

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java?rev=1343934&r1=1343933&r2=1343934&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java Tue May 29 19:15:32 2012
@@ -107,8 +107,7 @@ public class TestThriftLegacySource {
     source.setChannelProcessor(new ChannelProcessor(rcs));
   }
 
-  @Test
-  public void testLifecycle() throws InterruptedException {
+  private void bind() throws InterruptedException {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
@@ -131,8 +130,10 @@ public class TestThriftLegacySource {
         .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
             source, LifecycleState.START_OR_ERROR));
     Assert.assertEquals("Server is started", LifecycleState.START,
-        source.getLifecycleState());
+            source.getLifecycleState());
+  }
 
+  private void stop() throws InterruptedException {
     source.stop();
     Assert.assertTrue("Reached stop or error",
         LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
@@ -141,31 +142,14 @@ public class TestThriftLegacySource {
   }
 
   @Test
-  public void testRequest() throws InterruptedException, IOException {
-    boolean bound = false;
-    int i;
-
-    for (i = 0; i < 100 && !bound; i++) {
-      try {
-        Context context = new Context();
-
-        context.put("port", String.valueOf(selectedPort = 41414 + i));
-        context.put("host", "0.0.0.0");
-
-        Configurables.configure(source, context);
-
-        source.start();
-        bound = true;
-      } catch (ChannelException e) {
-        // Assume port in use, try another one
-      }
-    }
+  public void testLifecycle() throws InterruptedException {
+    bind();
+    stop();
+  }
 
-    Assert
-        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
-            source, LifecycleState.START_OR_ERROR));
-    Assert.assertEquals("Server is started", LifecycleState.START,
-        source.getLifecycleState());
+  @Test
+  public void testRequest() throws InterruptedException, IOException {
+    bind();
 
     Map flumeMap = new HashMap<CharSequence, ByteBuffer>();
     ThriftFlumeEvent thriftEvent =  new ThriftFlumeEvent(
@@ -185,12 +169,33 @@ public class TestThriftLegacySource {
     transaction.commit();
     transaction.close();
 
-    source.stop();
+    stop();
+  }
 
-    Assert.assertTrue("Reached stop or error",
-        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
-    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
-        source.getLifecycleState());
+  @Test
+  public void testHeaders() throws InterruptedException, IOException {
+    bind();
+
+    Map flumeHeaders = new HashMap<CharSequence, ByteBuffer>();
+    flumeHeaders.put("hello", ByteBuffer.wrap("world".getBytes("UTF-8")));
+    ThriftFlumeEvent thriftEvent =  new ThriftFlumeEvent(
+        1, Priority.INFO, ByteBuffer.wrap("foo".getBytes()),
+        0, "fooHost", flumeHeaders);
+    FlumeClient fClient = new FlumeClient("0.0.0.0", selectedPort);
+    fClient.append(thriftEvent);
+
+    // check if the event has arrived in the channel through OG thrift source
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("Event in channel has our header", "world",
+        event.getHeaders().get("hello"));
+    transaction.commit();
+    transaction.close();
+
+    stop();
   }
 
 }