You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/07/25 15:39:11 UTC

[2/2] camel git commit: Add producer support for raw input data

Add producer support for raw input data


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b170152d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b170152d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b170152d

Branch: refs/heads/master
Commit: b170152df80611118a7aad628747be15ea7fe07c
Parents: 18e89e7
Author: Preben Asmussen <pr...@gmail.com>
Authored: Sat Jul 25 14:26:30 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jul 25 15:34:27 2015 +0200

----------------------------------------------------------------------
 .../component/splunk/SplunkConfiguration.java   | 18 ++++-
 .../camel/component/splunk/SplunkProducer.java  |  7 +-
 .../component/splunk/event/SplunkEvent.java     |  6 +-
 .../component/splunk/support/DataWriter.java    |  4 +-
 .../splunk/support/SplunkDataWriter.java        | 14 ++--
 .../splunk/support/SubmitDataWriter.java        |  7 +-
 .../camel/component/splunk/ProducerTest.java    | 12 +++-
 .../camel/component/splunk/RawProducerTest.java | 76 ++++++++++++++++++++
 .../SplunkComponentConfigurationTest.java       | 14 ++--
 9 files changed, 134 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
index 94132e8..087bf33 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.splunk;
 
 import com.splunk.Service;
+
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -57,6 +58,8 @@ public class SplunkConfiguration {
     private String source;
     @UriParam(label = "producer")
     private int tcpReceiverPort;
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean raw;
 
     @UriParam(label = "consumer")
     private int count;
@@ -147,6 +150,17 @@ public class SplunkConfiguration {
         this.tcpReceiverPort = tcpReceiverPort;
     }
 
+    public boolean isRaw() {
+        return raw;
+    }
+
+    /**
+     * Should the payload be inserted raw
+     */
+    public void setRaw(boolean raw) {
+        this.raw = raw;
+    }
+
     public String getSourceType() {
         return sourceType;
     }
@@ -260,7 +274,7 @@ public class SplunkConfiguration {
     public boolean isStreaming() {
         return streaming != null ? streaming : false;
     }
-    
+
     /**
      * Sets streaming mode.
      * <p>
@@ -269,7 +283,7 @@ public class SplunkConfiguration {
     public void setStreaming(boolean streaming) {
         this.streaming = streaming;
     }
-    
+
     public int getConnectionTimeout() {
         return connectionTimeout;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
index a98d036..139f68c 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.splunk;
 
 import com.splunk.Args;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.component.splunk.event.SplunkEvent;
 import org.apache.camel.component.splunk.support.DataWriter;
@@ -46,7 +47,11 @@ public class SplunkProducer extends DefaultProducer {
             if (!dataWriter.isConnected()) {
                 dataWriter.start();
             }
-            dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+            if (endpoint.getConfiguration().isRaw()) {
+                dataWriter.write(exchange.getIn().getMandatoryBody(String.class));
+            } else {
+                dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+            }
         } catch (Exception e) {
             if (endpoint.reset(e)) {
                 dataWriter.stop();

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
index 1c19c66..10daad3 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
@@ -159,6 +159,10 @@ public class SplunkEvent implements Serializable {
      */
     public static final String COMMON_VENDOR = "vendor";
 
+    /**
+     * Event break delimiter
+     */
+    public static final String LINEBREAK = "\n";
     // ----------------------------------
     // Update
     // ----------------------------------
@@ -202,8 +206,6 @@ public class SplunkEvent implements Serializable {
     private static final String THROWABLE_MESSAGE = "throwable_message";
     private static final String THROWABLE_STACKTRACE_ELEMENTS = "stacktrace_elements";
 
-    private static final String LINEBREAK = "\n";
-
     private static final long serialVersionUID = 1L;
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
index 44fd851..fb83b59 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
@@ -20,7 +20,9 @@ import org.apache.camel.component.splunk.event.SplunkEvent;
 
 public interface DataWriter {
     void write(SplunkEvent data) throws Exception;
-
+    
+    void write(String data) throws Exception;
+    
     void stop();
 
     void start();

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
index d0ca8b0..ee1a349 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -46,14 +46,18 @@ public abstract class SplunkDataWriter implements DataWriter {
     protected abstract Socket createSocket(Service service) throws IOException;
 
     public void write(SplunkEvent event) throws Exception {
-        LOG.debug("writing event to splunk:" + event);
-        doWrite(event);
+        doWrite(event.toString());
+    }
+
+    public void write(String event) throws Exception {
+        doWrite(event + SplunkEvent.LINEBREAK);
     }
 
-    protected void doWrite(SplunkEvent event) throws IOException {
+    protected void doWrite(String event) throws IOException {
+        LOG.debug("writing event to splunk:" + event);
         OutputStream ostream = socket.getOutputStream();
-        Writer writer = new OutputStreamWriter(ostream, "UTF8");
-        writer.write(event.toString());
+        Writer writer = new OutputStreamWriter(ostream, "UTF-8");
+        writer.write(event);
         writer.flush();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
index c39336b..43c62b0 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -24,7 +24,6 @@ import com.splunk.Index;
 import com.splunk.Receiver;
 import com.splunk.Service;
 import org.apache.camel.component.splunk.SplunkEndpoint;
-import org.apache.camel.component.splunk.event.SplunkEvent;
 
 public class SubmitDataWriter extends SplunkDataWriter {
     private String index;
@@ -34,13 +33,13 @@ public class SubmitDataWriter extends SplunkDataWriter {
     }
 
     @Override
-    protected void doWrite(SplunkEvent event) throws IOException {
+    protected void doWrite(String event) throws IOException {
         Index index = getIndex();
         if (index != null) {
-            index.submit(args, event.toString());
+            index.submit(args, event);
         } else {
             Receiver receiver = endpoint.getService().getReceiver();
-            receiver.submit(args, event.toString());
+            receiver.submit(args, event);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
index dbbb02c..e372598 100644
--- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.splunk;
 
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
@@ -48,7 +49,7 @@ public class ProducerTest extends SplunkMockTestSupport {
         template.sendBody("direct:stream", splunkEvent);
         assertMockEndpointsSatisfied();
         Producer streamProducer = streamEndpoint.createProducer();
-        assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer) streamProducer).getDataWriter());
+        assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer)streamProducer).getDataWriter());
     }
 
     @Test
@@ -62,7 +63,7 @@ public class ProducerTest extends SplunkMockTestSupport {
         template.sendBody("direct:submit", splunkEvent);
         assertMockEndpointsSatisfied();
         Producer submitProducer = submitEndpoint.createProducer();
-        assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer) submitProducer).getDataWriter());
+        assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer)submitProducer).getDataWriter());
     }
 
     @Test
@@ -76,7 +77,12 @@ public class ProducerTest extends SplunkMockTestSupport {
         template.sendBody("direct:tcp", splunkEvent);
         assertMockEndpointsSatisfied();
         Producer tcpProducer = tcpEndpoint.createProducer();
-        assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer) tcpProducer).getDataWriter());
+        assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer)tcpProducer).getDataWriter());
+    }
+
+    @Test(expected = CamelExecutionException.class)
+    public void testBodyWithoutRawOption() throws Exception {
+        template.sendBody("direct:tcp", "foobar");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
new file mode 100644
index 0000000..b90370d
--- /dev/null
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class RawProducerTest extends SplunkMockTestSupport {
+    private static final String PAYLOAD = "{foo:1, bar:2}";
+
+    @EndpointInject(uri = "splunk://stream")
+    protected SplunkEndpoint streamEndpoint;
+
+    @EndpointInject(uri = "splunk://submit")
+    protected SplunkEndpoint submitEndpoint;
+
+    @EndpointInject(uri = "splunk://tcp")
+    protected SplunkEndpoint tcpEndpoint;
+
+    @Test
+    public void testStreamWriter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:stream-result");
+        mock.setExpectedMessageCount(1);
+        mock.expectedBodiesReceived(PAYLOAD);
+        template.sendBody("direct:stream", PAYLOAD);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSubmitWriter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:submitresult");
+        mock.setExpectedMessageCount(1);
+        mock.expectedBodiesReceived(PAYLOAD);
+        template.sendBody("direct:submit", PAYLOAD);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testTcpWriter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:tcpresult");
+        mock.setExpectedMessageCount(1);
+        mock.expectedBodiesReceived(PAYLOAD);
+        template.sendBody("direct:tcp", PAYLOAD);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:stream").to("splunk://stream?username=foo&password=bar&index=myindex&sourceType=SourceType&source=Source&raw=true").to("mock:stream-result");
+
+                from("direct:submit").to("splunk://submit?username=foo&password=bar&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:submitresult");
+
+                from("direct:tcp").to("splunk://tcp?username=foo&password=bar&tcpReceiverPort=2222&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:tcpresult");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
index 06e9bd9..e36c197 100644
--- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
@@ -35,6 +35,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
         assertEquals(Service.DEFAULT_SCHEME, endpoint.getConfiguration().getScheme());
         assertEquals(5000, endpoint.getConfiguration().getConnectionTimeout());
         assertFalse(endpoint.getConfiguration().isUseSunHttpsHandler());
+        assertFalse(endpoint.getConfiguration().isRaw());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -47,9 +48,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
     public void createProducerEndpointWithMaximalConfiguration() throws Exception {
         SplunkComponent component = context.getComponent("splunk", SplunkComponent.class);
 
-        SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&"
-                                                                           + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&"
-                                                                           + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true");
+        SplunkEndpoint endpoint = (SplunkEndpoint)component
+            .createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&" + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&"
+                            + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true&raw=true");
         assertEquals("myhost", endpoint.getConfiguration().getHost());
         assertEquals(3333, endpoint.getConfiguration().getPort());
         assertEquals("test", endpoint.getConfiguration().getUsername());
@@ -61,6 +62,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
         assertEquals("me", endpoint.getConfiguration().getOwner());
         assertEquals("fantasticapp", endpoint.getConfiguration().getApp());
         assertTrue(endpoint.getConfiguration().isUseSunHttpsHandler());
+        assertTrue(endpoint.getConfiguration().isRaw());
     }
 
     @Test
@@ -81,9 +83,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
     public void createConsumerEndpointWithMaximalConfiguration() throws Exception {
         SplunkComponent component = context.getComponent("splunk", SplunkComponent.class);
 
-        SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&"
-                                                                           + "search=Splunk search query goes here&initEarliestTime=-1d" + "&latestTime=now&count=10&"
-                                                                           + "owner=me&app=fantasticapp");
+        SplunkEndpoint endpoint = (SplunkEndpoint)component
+            .createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&" + "search=Splunk search query goes here&initEarliestTime=-1d"
+                            + "&latestTime=now&count=10&" + "owner=me&app=fantasticapp");
         assertEquals("myhost", endpoint.getConfiguration().getHost());
         assertEquals(3333, endpoint.getConfiguration().getPort());
         assertEquals("test", endpoint.getConfiguration().getUsername());