You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/07/25 15:39:11 UTC
[2/2] camel git commit: Add producer support for raw input data
Add producer support for raw input data
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b170152d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b170152d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b170152d
Branch: refs/heads/master
Commit: b170152df80611118a7aad628747be15ea7fe07c
Parents: 18e89e7
Author: Preben Asmussen <pr...@gmail.com>
Authored: Sat Jul 25 14:26:30 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jul 25 15:34:27 2015 +0200
----------------------------------------------------------------------
.../component/splunk/SplunkConfiguration.java | 18 ++++-
.../camel/component/splunk/SplunkProducer.java | 7 +-
.../component/splunk/event/SplunkEvent.java | 6 +-
.../component/splunk/support/DataWriter.java | 4 +-
.../splunk/support/SplunkDataWriter.java | 14 ++--
.../splunk/support/SubmitDataWriter.java | 7 +-
.../camel/component/splunk/ProducerTest.java | 12 +++-
.../camel/component/splunk/RawProducerTest.java | 76 ++++++++++++++++++++
.../SplunkComponentConfigurationTest.java | 14 ++--
9 files changed, 134 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
index 94132e8..087bf33 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.splunk;
import com.splunk.Service;
+
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
@@ -57,6 +58,8 @@ public class SplunkConfiguration {
private String source;
@UriParam(label = "producer")
private int tcpReceiverPort;
+ @UriParam(label = "producer", defaultValue = "false")
+ private boolean raw;
@UriParam(label = "consumer")
private int count;
@@ -147,6 +150,17 @@ public class SplunkConfiguration {
this.tcpReceiverPort = tcpReceiverPort;
}
+ public boolean isRaw() {
+ return raw;
+ }
+
+ /**
+ * Should the payload be inserted raw
+ */
+ public void setRaw(boolean raw) {
+ this.raw = raw;
+ }
+
public String getSourceType() {
return sourceType;
}
@@ -260,7 +274,7 @@ public class SplunkConfiguration {
public boolean isStreaming() {
return streaming != null ? streaming : false;
}
-
+
/**
* Sets streaming mode.
* <p>
@@ -269,7 +283,7 @@ public class SplunkConfiguration {
public void setStreaming(boolean streaming) {
this.streaming = streaming;
}
-
+
public int getConnectionTimeout() {
return connectionTimeout;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
index a98d036..139f68c 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.splunk;
import com.splunk.Args;
+
import org.apache.camel.Exchange;
import org.apache.camel.component.splunk.event.SplunkEvent;
import org.apache.camel.component.splunk.support.DataWriter;
@@ -46,7 +47,11 @@ public class SplunkProducer extends DefaultProducer {
if (!dataWriter.isConnected()) {
dataWriter.start();
}
- dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+ if (endpoint.getConfiguration().isRaw()) {
+ dataWriter.write(exchange.getIn().getMandatoryBody(String.class));
+ } else {
+ dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+ }
} catch (Exception e) {
if (endpoint.reset(e)) {
dataWriter.stop();
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
index 1c19c66..10daad3 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
@@ -159,6 +159,10 @@ public class SplunkEvent implements Serializable {
*/
public static final String COMMON_VENDOR = "vendor";
+ /**
+ * Event break delimiter
+ */
+ public static final String LINEBREAK = "\n";
// ----------------------------------
// Update
// ----------------------------------
@@ -202,8 +206,6 @@ public class SplunkEvent implements Serializable {
private static final String THROWABLE_MESSAGE = "throwable_message";
private static final String THROWABLE_STACKTRACE_ELEMENTS = "stacktrace_elements";
- private static final String LINEBREAK = "\n";
-
private static final long serialVersionUID = 1L;
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
index 44fd851..fb83b59 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
@@ -20,7 +20,9 @@ import org.apache.camel.component.splunk.event.SplunkEvent;
public interface DataWriter {
void write(SplunkEvent data) throws Exception;
-
+
+ void write(String data) throws Exception;
+
void stop();
void start();
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
index d0ca8b0..ee1a349 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -46,14 +46,18 @@ public abstract class SplunkDataWriter implements DataWriter {
protected abstract Socket createSocket(Service service) throws IOException;
public void write(SplunkEvent event) throws Exception {
- LOG.debug("writing event to splunk:" + event);
- doWrite(event);
+ doWrite(event.toString());
+ }
+
+ public void write(String event) throws Exception {
+ doWrite(event + SplunkEvent.LINEBREAK);
}
- protected void doWrite(SplunkEvent event) throws IOException {
+ protected void doWrite(String event) throws IOException {
+ LOG.debug("writing event to splunk:" + event);
OutputStream ostream = socket.getOutputStream();
- Writer writer = new OutputStreamWriter(ostream, "UTF8");
- writer.write(event.toString());
+ Writer writer = new OutputStreamWriter(ostream, "UTF-8");
+ writer.write(event);
writer.flush();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
index c39336b..43c62b0 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -24,7 +24,6 @@ import com.splunk.Index;
import com.splunk.Receiver;
import com.splunk.Service;
import org.apache.camel.component.splunk.SplunkEndpoint;
-import org.apache.camel.component.splunk.event.SplunkEvent;
public class SubmitDataWriter extends SplunkDataWriter {
private String index;
@@ -34,13 +33,13 @@ public class SubmitDataWriter extends SplunkDataWriter {
}
@Override
- protected void doWrite(SplunkEvent event) throws IOException {
+ protected void doWrite(String event) throws IOException {
Index index = getIndex();
if (index != null) {
- index.submit(args, event.toString());
+ index.submit(args, event);
} else {
Receiver receiver = endpoint.getService().getReceiver();
- receiver.submit(args, event.toString());
+ receiver.submit(args, event);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
index dbbb02c..e372598 100644
--- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.splunk;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
@@ -48,7 +49,7 @@ public class ProducerTest extends SplunkMockTestSupport {
template.sendBody("direct:stream", splunkEvent);
assertMockEndpointsSatisfied();
Producer streamProducer = streamEndpoint.createProducer();
- assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer) streamProducer).getDataWriter());
+ assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer)streamProducer).getDataWriter());
}
@Test
@@ -62,7 +63,7 @@ public class ProducerTest extends SplunkMockTestSupport {
template.sendBody("direct:submit", splunkEvent);
assertMockEndpointsSatisfied();
Producer submitProducer = submitEndpoint.createProducer();
- assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer) submitProducer).getDataWriter());
+ assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer)submitProducer).getDataWriter());
}
@Test
@@ -76,7 +77,12 @@ public class ProducerTest extends SplunkMockTestSupport {
template.sendBody("direct:tcp", splunkEvent);
assertMockEndpointsSatisfied();
Producer tcpProducer = tcpEndpoint.createProducer();
- assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer) tcpProducer).getDataWriter());
+ assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer)tcpProducer).getDataWriter());
+ }
+
+ @Test(expected = CamelExecutionException.class)
+ public void testBodyWithoutRawOption() throws Exception {
+ template.sendBody("direct:tcp", "foobar");
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
new file mode 100644
index 0000000..b90370d
--- /dev/null
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class RawProducerTest extends SplunkMockTestSupport {
+ private static final String PAYLOAD = "{foo:1, bar:2}";
+
+ @EndpointInject(uri = "splunk://stream")
+ protected SplunkEndpoint streamEndpoint;
+
+ @EndpointInject(uri = "splunk://submit")
+ protected SplunkEndpoint submitEndpoint;
+
+ @EndpointInject(uri = "splunk://tcp")
+ protected SplunkEndpoint tcpEndpoint;
+
+ @Test
+ public void testStreamWriter() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:stream-result");
+ mock.setExpectedMessageCount(1);
+ mock.expectedBodiesReceived(PAYLOAD);
+ template.sendBody("direct:stream", PAYLOAD);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testSubmitWriter() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:submitresult");
+ mock.setExpectedMessageCount(1);
+ mock.expectedBodiesReceived(PAYLOAD);
+ template.sendBody("direct:submit", PAYLOAD);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testTcpWriter() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:tcpresult");
+ mock.setExpectedMessageCount(1);
+ mock.expectedBodiesReceived(PAYLOAD);
+ template.sendBody("direct:tcp", PAYLOAD);
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:stream").to("splunk://stream?username=foo&password=bar&index=myindex&sourceType=SourceType&source=Source&raw=true").to("mock:stream-result");
+
+ from("direct:submit").to("splunk://submit?username=foo&password=bar&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:submitresult");
+
+ from("direct:tcp").to("splunk://tcp?username=foo&password=bar&tcpReceiverPort=2222&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:tcpresult");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
index 06e9bd9..e36c197 100644
--- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
+++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java
@@ -35,6 +35,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
assertEquals(Service.DEFAULT_SCHEME, endpoint.getConfiguration().getScheme());
assertEquals(5000, endpoint.getConfiguration().getConnectionTimeout());
assertFalse(endpoint.getConfiguration().isUseSunHttpsHandler());
+ assertFalse(endpoint.getConfiguration().isRaw());
}
@Test(expected = IllegalArgumentException.class)
@@ -47,9 +48,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
public void createProducerEndpointWithMaximalConfiguration() throws Exception {
SplunkComponent component = context.getComponent("splunk", SplunkComponent.class);
- SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&"
- + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&"
- + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true");
+ SplunkEndpoint endpoint = (SplunkEndpoint)component
+ .createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&" + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&"
+ + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true&raw=true");
assertEquals("myhost", endpoint.getConfiguration().getHost());
assertEquals(3333, endpoint.getConfiguration().getPort());
assertEquals("test", endpoint.getConfiguration().getUsername());
@@ -61,6 +62,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
assertEquals("me", endpoint.getConfiguration().getOwner());
assertEquals("fantasticapp", endpoint.getConfiguration().getApp());
assertTrue(endpoint.getConfiguration().isUseSunHttpsHandler());
+ assertTrue(endpoint.getConfiguration().isRaw());
}
@Test
@@ -81,9 +83,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport {
public void createConsumerEndpointWithMaximalConfiguration() throws Exception {
SplunkComponent component = context.getComponent("splunk", SplunkComponent.class);
- SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&"
- + "search=Splunk search query goes here&initEarliestTime=-1d" + "&latestTime=now&count=10&"
- + "owner=me&app=fantasticapp");
+ SplunkEndpoint endpoint = (SplunkEndpoint)component
+ .createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&" + "search=Splunk search query goes here&initEarliestTime=-1d"
+ + "&latestTime=now&count=10&" + "owner=me&app=fantasticapp");
assertEquals("myhost", endpoint.getConfiguration().getHost());
assertEquals(3333, endpoint.getConfiguration().getPort());
assertEquals("test", endpoint.getConfiguration().getUsername());