You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/12 20:51:39 UTC

git commit: FLUME-2426. Support interceptors in the Embedded Agent

Repository: flume
Updated Branches:
  refs/heads/trunk 4e08bf7d3 -> 59f0b4df9


FLUME-2426. Support interceptors in the Embedded Agent

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/59f0b4df
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/59f0b4df
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/59f0b4df

Branch: refs/heads/trunk
Commit: 59f0b4df97231acdc0b9769dccb3211c502b36d3
Parents: 4e08bf7
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Sep 12 11:50:57 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Sep 12 11:50:57 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeDeveloperGuide.rst     |  8 ++++++-
 .../flume/agent/embedded/EmbeddedAgent.java     |  4 +++-
 .../flume/agent/embedded/TestEmbeddedAgent.java | 24 +++++++++++++++++++-
 .../TestEmbeddedAgentConfiguration.java         |  4 ++++
 4 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index ec6a735..e3b60e6 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -450,7 +450,7 @@ sources, sinks, and channels are allowed. Specifically the source used
 is a special embedded source and events should be send to the source
 via the put, putAll methods on the EmbeddedAgent object. Only File Channel
 and Memory Channel are allowed as channels while Avro Sink is the only
-supported sink.
+supported sink. Interceptors are also supported by the embedded agent.
 
 Note: The embedded agent has a dependency on hadoop-core.jar.
 
@@ -470,6 +470,8 @@ channel.*             --                Configuration options for the channel ty
 sink.*                --                Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port.
 **processor.type**    --                Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
 processor.*           --                Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.
+source.interceptors   --                Space-separated list of interceptors
+source.interceptors.* --                Configuration options for individual interceptors specified in the source.interceptors property
 ====================  ================  ==============================================
 
 Below is an example of how to use the agent:
@@ -487,6 +489,10 @@ Below is an example of how to use the agent:
     properties.put("sink2.hostname", "collector2.apache.org");
     properties.put("sink2.port",  "5565");
     properties.put("processor.type", "load_balance");
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "static");
+    properties.put("source.interceptors.i1.key", "key1");
+    properties.put("source.interceptors.i1.value", "value1");
 
     EmbeddedAgent agent = new EmbeddedAgent("myagent");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index d02f440..32c9f18 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -111,7 +111,8 @@ public class EmbeddedAgent {
       throw new IllegalStateException("Cannot be started before being " +
           "configured");
     }
-    doStart();
+    // This check needs to be done before doStart(),
+    // as doStart() accesses sourceRunner.getSource()
     Source source = Preconditions.checkNotNull(sourceRunner.getSource(),
         "Source runner returned null source");
     if(source instanceof EmbeddedSource) {
@@ -120,6 +121,7 @@ public class EmbeddedAgent {
       throw new IllegalStateException("Unknown source type: " + source.
           getClass().getName());
     }
+    doStart();
     state = State.STARTED;
   }
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
index 0d644c6..975ba8d 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
@@ -61,7 +61,6 @@ public class TestEmbeddedAgent {
   private Map<String, String> headers;
   private byte[] body;
 
-
   @Before
   public void setUp() throws Exception {
     headers = Maps.newHashMap();
@@ -93,6 +92,7 @@ public class TestEmbeddedAgent {
 
     agent = new EmbeddedAgent("test-" + serialNumber.incrementAndGet());
   }
+
   @After
   public void tearDown() throws Exception {
     if(agent != null) {
@@ -110,6 +110,7 @@ public class TestEmbeddedAgent {
       }
     }
   }
+
   @Test(timeout = 30000L)
   public void testPut() throws Exception {
     agent.configure(properties);
@@ -124,6 +125,7 @@ public class TestEmbeddedAgent {
     Assert.assertArrayEquals(body, event.getBody());
     Assert.assertEquals(headers, event.getHeaders());
   }
+
   @Test(timeout = 30000L)
   public void testPutAll() throws Exception {
     List<Event> events = Lists.newArrayList();
@@ -141,7 +143,27 @@ public class TestEmbeddedAgent {
     Assert.assertEquals(headers, event.getHeaders());
   }
 
+  @Test(timeout = 30000L)
+  public void testPutWithInterceptors() throws Exception {
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "static");
+    properties.put("source.interceptors.i1.key", "key2");
+    properties.put("source.interceptors.i1.value", "value2");
+
+    agent.configure(properties);
+    agent.start();
+    agent.put(EventBuilder.withBody(body, headers));
 
+    Event event;
+    while((event = eventCollector.poll()) == null) {
+      Thread.sleep(500L);
+    }
+    Assert.assertNotNull(event);
+    Assert.assertArrayEquals(body, event.getBody());
+    Map<String, String> newHeaders = new HashMap<String, String>(headers);
+    newHeaders.put("key2", "value2");
+    Assert.assertEquals(newHeaders, event.getHeaders());
+  }
 
   static class EventCollector implements AvroSourceProtocol {
     private final Queue<AvroFlumeEvent> eventQueue =

http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
index f70d0b1..f4a9a58 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
@@ -46,6 +46,8 @@ public class TestEmbeddedAgentConfiguration {
     properties.put("sink2.hostname", "sink2.host");
     properties.put("sink2.port", "2");
     properties.put("processor.type", "load_balance");
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "timestamp");
   }
 
 
@@ -91,6 +93,8 @@ public class TestEmbeddedAgentConfiguration {
     expected.put("test1.sources.source-test1.channels", "channel-test1");
     expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration.
         SOURCE_TYPE_EMBEDDED);
+    expected.put("test1.sources.source-test1.interceptors", "i1");
+    expected.put("test1.sources.source-test1.interceptors.i1.type", "timestamp");
     Assert.assertEquals(expected, actual);
   }