You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/17 09:35:01 UTC
[3/3] git commit: CAMEL-6584: camel-splunk component. Thanks to
Preben Asmussen for the contribution.
CAMEL-6584: camel-splunk component. Thanks to Preben Asmussen for the contribution.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e47197b9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e47197b9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e47197b9
Branch: refs/heads/master
Commit: e47197b96781e1a919fff4c2024d80f67fe1ea08
Parents: 92681f7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 17 09:05:44 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 17 09:34:51 2013 +0200
----------------------------------------------------------------------
components/camel-splunk/pom.xml | 94 ++++
.../camel/component/splunk/ConsumerType.java | 31 ++
.../DefaultSplunkConfigurationFactory.java | 39 ++
.../camel/component/splunk/ProducerType.java | 30 +
.../camel/component/splunk/SplunkComponent.java | 45 ++
.../component/splunk/SplunkConfiguration.java | 260 +++++++++
.../splunk/SplunkConfigurationFactory.java | 25 +
.../camel/component/splunk/SplunkConsumer.java | 106 ++++
.../camel/component/splunk/SplunkEndpoint.java | 114 ++++
.../camel/component/splunk/SplunkProducer.java | 113 ++++
.../component/splunk/event/SplunkEvent.java | 545 +++++++++++++++++++
.../component/splunk/support/DataWriter.java | 27 +
.../splunk/support/SplunkDataReader.java | 326 +++++++++++
.../splunk/support/SplunkDataWriter.java | 81 +++
.../splunk/support/StreamDataWriter.java | 61 +++
.../splunk/support/SubmitDataWriter.java | 59 ++
.../component/splunk/support/TcpDataWriter.java | 50 ++
.../src/main/resources/META-INF/LICENSE.txt | 203 +++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../services/org/apache/camel/component/splunk | 17 +
.../camel/component/splunk/ConsumerTest.java | 68 +++
.../apache/camel/component/splunk/Helper.java | 86 +++
.../camel/component/splunk/ProducerTest.java | 94 ++++
.../SplunkComponentConfigurationTest.java | 88 +++
.../component/splunk/SplunkMockTestSupport.java | 52 ++
.../splunk/integration/NormalSearchTest.java | 58 ++
.../splunk/integration/RealtimeSearchTest.java | 56 ++
.../splunk/integration/SavedSearchTest.java | 59 ++
.../splunk/integration/SplunkProducerTest.java | 82 +++
.../splunk/integration/SplunkTest.java | 42 ++
.../integration/TestFieldListSearchTest.java | 55 ++
.../src/test/resources/log4j.properties | 36 ++
.../test/resources/resultsreader_test_data.json | 40 ++
components/pom.xml | 1 +
34 files changed, 3054 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-splunk/pom.xml b/components/camel-splunk/pom.xml
new file mode 100644
index 0000000..dc8566a
--- /dev/null
+++ b/components/camel-splunk/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.13-SNAPSHOT</version>
+ </parent>
+
+ <name>Camel :: Splunk</name>
+ <artifactId>camel-splunk</artifactId>
+ <description>Camel :: Splunk component</description>
+ <packaging>bundle</packaging>
+
+ <properties>
+ <camel.osgi.export.pkg>org.apache.camel.component.splunk.*</camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=splunk</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+
+ <!-- TODO: use SMX bundle version when its released -->
+ <dependency>
+ <groupId>com.splunk</groupId>
+ <artifactId>splunk</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${jodatime2-bundle-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson-version}</version>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>${hamcrest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <!-- TODO: remove me when we use SMX bundle version for splunk -->
+ <repositories>
+ <repository>
+ <id>ext-release-local</id>
+ <url>http://splunk.artifactoryonline.com/splunk/ext-releases-local</url>
+ </repository>
+ </repositories>
+
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
new file mode 100644
index 0000000..fe5f10b
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ConsumerType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+public enum ConsumerType {
+ NORMAL, REALTIME, SAVEDSEARCH, UNKNOWN;
+
+ public static ConsumerType fromUri(String uri) {
+ for (ConsumerType consumerType : ConsumerType.values()) {
+ if (consumerType.name().equalsIgnoreCase(uri)) {
+ return consumerType;
+ }
+ }
+ return ConsumerType.UNKNOWN;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
new file mode 100644
index 0000000..867b76d
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/DefaultSplunkConfigurationFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+import org.apache.camel.util.ObjectHelper;
+
+public class DefaultSplunkConfigurationFactory implements SplunkConfigurationFactory {
+
+ public SplunkConfiguration parseMap(Map<String, Object> parameters) {
+ String host = (String) parameters.get("host");
+ String port = (String) parameters.get("port");
+ String username = (String) parameters.get("username");
+ String password = (String) parameters.get("password");
+ if (ObjectHelper.isEmpty(username) || ObjectHelper.isEmpty(password)) {
+ throw new IllegalArgumentException("Username and password has to be specified");
+ }
+ if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) {
+ return new SplunkConfiguration(host, Integer.valueOf(port), username, password);
+ }
+ return new SplunkConfiguration(username, password);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
new file mode 100644
index 0000000..e73def3
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/ProducerType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+public enum ProducerType {
+ TCP, SUBMIT, STREAM, UNKNOWN;
+
+ public static ProducerType fromUri(String uri) {
+ for (ProducerType producerType : ProducerType.values()) {
+ if (producerType.name().equalsIgnoreCase(uri)) {
+ return producerType;
+ }
+ }
+ return ProducerType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
new file mode 100644
index 0000000..7f346e2
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkComponent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Represents the component that manages {@link SplunkEndpoint}.
+ */
+public class SplunkComponent extends DefaultComponent {
+
+ private SplunkConfigurationFactory splunkConfigurationFactory = new DefaultSplunkConfigurationFactory();
+
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ SplunkConfiguration configuration = splunkConfigurationFactory.parseMap(parameters);
+ setProperties(configuration, parameters);
+ return new SplunkEndpoint(uri, this, configuration);
+ }
+
+ public SplunkConfigurationFactory getSplunkConfigurationFactory() {
+ return splunkConfigurationFactory;
+ }
+
+ public void setSplunkConfigurationFactory(DefaultSplunkConfigurationFactory splunkConfigurationFactory) {
+ this.splunkConfigurationFactory = splunkConfigurationFactory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
new file mode 100644
index 0000000..20de08c
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.splunk.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplunkConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SplunkConfiguration.class);
+
+ private String host = Service.DEFAULT_HOST;
+ private int port = Service.DEFAULT_PORT;
+ private String scheme = Service.DEFAULT_SCHEME;
+ private String app;
+ private String owner;
+ private String username;
+ private String password;
+ private int connectionTimeout = 5000;
+ private String index;
+ private String sourceType;
+ private String source;
+ private int tcpReceiverPort;
+
+ // consumer properties
+ private int count;
+ private String fieldList;
+ private String search;
+ private String savedSearch;
+ private String earliestTime;
+ private String latestTime;
+ private String initEarliestTime;
+
+ public SplunkConfiguration(final String host, final int port, final String username, final String password) {
+ this.host = host;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ }
+
+ public SplunkConfiguration(final String username, final String password) {
+ this(Service.DEFAULT_HOST, Service.DEFAULT_PORT, username, password);
+ }
+
+ public String getInitEarliestTime() {
+ return initEarliestTime;
+ }
+
+ public void setInitEarliestTime(String initEarliestTime) {
+ this.initEarliestTime = initEarliestTime;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public String getFieldList() {
+ return fieldList;
+ }
+
+ public void setFieldList(String fieldList) {
+ this.fieldList = fieldList;
+ }
+
+ public String getSearch() {
+ return search;
+ }
+
+ public void setSearch(String search) {
+ this.search = search;
+ }
+
+ public String getEarliestTime() {
+ return earliestTime;
+ }
+
+ public void setEarliestTime(String earliestTime) {
+ this.earliestTime = earliestTime;
+ }
+
+ public String getLatestTime() {
+ return latestTime;
+ }
+
+ public void setLatestTime(String latestTime) {
+ this.latestTime = latestTime;
+ }
+
+ public int getTcpReceiverPort() {
+ return tcpReceiverPort;
+ }
+
+ public void setTcpReceiverPort(int tcpReceiverPort) {
+ this.tcpReceiverPort = tcpReceiverPort;
+ }
+
+ public String getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(String sourceType) {
+ this.sourceType = sourceType;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int timeout) {
+ this.connectionTimeout = timeout;
+ }
+
+ public String getSavedSearch() {
+ return this.savedSearch;
+ }
+
+ public void setSavedSearch(String savedSearch) {
+ this.savedSearch = savedSearch;
+ }
+
+ public Service createService() {
+ final Map<String, Object> args = new HashMap<String, Object>();
+ if (host != null) {
+ args.put("host", host);
+ }
+ if (port > 0) {
+ args.put("port", port);
+ }
+ if (scheme != null) {
+ args.put("scheme", scheme);
+ }
+ if (app != null) {
+ args.put("app", app);
+ }
+ if (owner != null) {
+ args.put("owner", owner);
+ }
+
+ args.put("username", username);
+ args.put("password", password);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future<Service> future = executor.submit(new Callable<Service>() {
+ public Service call() throws Exception {
+ return Service.connect(args);
+ }
+ });
+ try {
+ Service service;
+ if (connectionTimeout > 0) {
+ service = future.get(connectionTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ service = future.get();
+ }
+ LOG.info("Successfully connected to Splunk");
+ return service;
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
new file mode 100644
index 0000000..d92d854
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfigurationFactory.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.Map;
+
+public interface SplunkConfigurationFactory {
+
+ SplunkConfiguration parseMap(Map<String, Object> parameters);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
new file mode 100644
index 0000000..c3a9ce7
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.component.splunk.support.SplunkDataReader;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Splunk consumer.
+ */
+public class SplunkConsumer extends ScheduledBatchPollingConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(SplunkConsumer.class);
+ private SplunkDataReader dataReader;
+ private SplunkEndpoint endpoint;
+
+ public SplunkConsumer(SplunkEndpoint endpoint, Processor processor, ConsumerType consumerType) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ if (consumerType.equals(ConsumerType.NORMAL) || consumerType.equals(ConsumerType.REALTIME)) {
+ if (ObjectHelper.isEmpty(endpoint.getConfiguration().getSearch())) {
+ throw new RuntimeException("Missing option 'search' with normal or realtime search");
+ }
+ }
+ if (consumerType.equals(ConsumerType.SAVEDSEARCH) && ObjectHelper.isEmpty(endpoint.getConfiguration().getSavedSearch())) {
+ throw new RuntimeException("Missing option 'savedSearch' with saved search");
+ }
+ dataReader = new SplunkDataReader(endpoint, consumerType);
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ try {
+ List<SplunkEvent> events = dataReader.read();
+ Queue<Exchange> exchanges = createExchanges(events);
+ return processBatch(CastUtils.cast(exchanges));
+ } catch (Exception e) {
+ if (endpoint.reconnectIfPossible(e)) {
+ return 0;
+ } else {
+ getExceptionHandler().handleException(e);
+ return 0;
+ }
+ }
+ }
+
+ protected Queue<Exchange> createExchanges(List<SplunkEvent> splunkEvents) {
+ LOG.trace("Received {} messages in this poll", splunkEvents.size());
+ Queue<Exchange> answer = new LinkedList<Exchange>();
+ for (SplunkEvent splunkEvent : splunkEvents) {
+ Exchange exchange = getEndpoint().createExchange();
+ Message message = exchange.getIn();
+ message.setBody(splunkEvent);
+ answer.add(exchange);
+ }
+ return answer;
+ }
+
+ @Override
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int total = exchanges.size();
+
+ for (int index = 0; index < total && isBatchAllowed(); index++) {
+ Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+ try {
+ LOG.trace("Processing exchange [{}]...", exchange);
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+ }
+ }
+ return total;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
new file mode 100644
index 0000000..6e0d571
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import java.net.SocketException;
+import java.util.regex.Pattern;
+import javax.net.ssl.SSLException;
+
+import com.splunk.HttpException;
+import com.splunk.Service;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a Splunk endpoint.
+ */
+public class SplunkEndpoint extends ScheduledPollEndpoint {
+ private static final Logger LOG = LoggerFactory.getLogger(SplunkEndpoint.class);
+
+ private SplunkConfiguration configuration;
+ private Service service;
+
+ public SplunkEndpoint() {
+ }
+
+ public SplunkEndpoint(String uri, SplunkComponent component, SplunkConfiguration configuration) {
+ super(uri, component);
+ this.configuration = configuration;
+ }
+
+ public Producer createProducer() throws Exception {
+ String[] uriSplit = splitUri(getEndpointUri());
+ if (uriSplit.length > 0) {
+ ProducerType producerType = ProducerType.fromUri(uriSplit[0]);
+ return new SplunkProducer(this, producerType);
+ }
+ throw new IllegalArgumentException("Cannot create any producer with uri " + getEndpointUri() + ". A producer type was not provided (or an incorrect pairing was used).");
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ if (configuration.getInitEarliestTime() == null) {
+ throw new IllegalArgumentException("Required initialEarliestTime option could not be found");
+ }
+ String[] uriSplit = splitUri(getEndpointUri());
+ if (uriSplit.length > 0) {
+ ConsumerType consumerType = ConsumerType.fromUri(uriSplit[0]);
+ SplunkConsumer consumer = new SplunkConsumer(this, processor, consumerType);
+ configureConsumer(consumer);
+ return consumer;
+ }
+ throw new IllegalArgumentException("Cannot create any consumer with uri " + getEndpointUri() + ". A consumer type was not provided (or an incorrect pairing was used).");
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ service = null;
+ super.doStop();
+ }
+
+ public Service getService() {
+ if (service == null) {
+ this.service = configuration.createService();
+ }
+ return service;
+ }
+
+ private static String[] splitUri(String uri) {
+ Pattern p1 = Pattern.compile("splunk:(//)*");
+ Pattern p2 = Pattern.compile("\\?.*");
+
+ uri = p1.matcher(uri).replaceAll("");
+ uri = p2.matcher(uri).replaceAll("");
+
+ return uri.split("/");
+ }
+
+ public SplunkConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public synchronized boolean reconnectIfPossible(Exception e) {
+ boolean answer = false;
+ if (e instanceof HttpException && ((HttpException) e).getStatus() == 401 || ((e instanceof SocketException) || (e instanceof SSLException))) {
+ // try and reconnect
+ LOG.warn("Got exception from Splunk. Will try to reconnect");
+ this.service = null;
+ getService();
+ answer = true;
+ }
+ return answer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
new file mode 100644
index 0000000..bfea70c
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk;
+
+import com.splunk.Args;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.component.splunk.support.DataWriter;
+import org.apache.camel.component.splunk.support.StreamDataWriter;
+import org.apache.camel.component.splunk.support.SubmitDataWriter;
+import org.apache.camel.component.splunk.support.TcpDataWriter;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Splunk producer.
+ */
+public class SplunkProducer extends DefaultProducer {
+ private static final transient Logger LOG = LoggerFactory.getLogger(SplunkProducer.class);
+ private SplunkEndpoint endpoint;
+ private DataWriter dataWriter;
+
+ public SplunkProducer(SplunkEndpoint endpoint, ProducerType producerType) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ createWriter(producerType);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ try {
+ dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class));
+ } catch (Exception e) {
+ if (endpoint.reconnectIfPossible(e)) {
+ dataWriter.start();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ dataWriter.start();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ dataWriter.stop();
+ super.doStop();
+ }
+
+ private void createWriter(ProducerType producerType) {
+ switch (producerType) {
+ case TCP: {
+ LOG.debug("Creating TcpDataWriter");
+ dataWriter = new TcpDataWriter(endpoint, buildSplunkArgs());
+ ((TcpDataWriter) dataWriter).setPort(endpoint.getConfiguration().getTcpReceiverPort());
+ LOG.debug("TcpDataWriter created for endpoint {}", endpoint);
+ break;
+ }
+ case SUBMIT: {
+ LOG.debug("Creating SubmitDataWriter");
+ dataWriter = new SubmitDataWriter(endpoint, buildSplunkArgs());
+ ((SubmitDataWriter) dataWriter).setIndex(endpoint.getConfiguration().getIndex());
+ LOG.debug("SubmitDataWriter created for endpoint {}", endpoint);
+ break;
+ }
+ case STREAM: {
+ LOG.debug("Creating StreamDataWriter");
+ dataWriter = new StreamDataWriter(endpoint, buildSplunkArgs());
+ ((StreamDataWriter) dataWriter).setIndex(endpoint.getConfiguration().getIndex());
+ LOG.debug("StreamDataWriter created for endpoint {}", endpoint);
+ break;
+ }
+ case UNKNOWN: {
+ throw new RuntimeException("unknown producerType");
+ }
+ default: {
+ throw new RuntimeException("unknown producerType");
+ }
+ }
+ }
+
+ private Args buildSplunkArgs() {
+ Args args = new Args();
+ if (endpoint.getConfiguration().getSourceType() != null) {
+ args.put("sourcetype", endpoint.getConfiguration().getSourceType());
+ }
+ if (endpoint.getConfiguration().getSource() != null) {
+ args.put("source", endpoint.getConfiguration().getSource());
+ }
+ return args;
+ }
+
+ protected DataWriter getDataWriter() {
+ return dataWriter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
new file mode 100644
index 0000000..589a784
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.event;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+@SuppressWarnings("serial")
+public class SplunkEvent implements Serializable {
+
+ // ----------------------------------
+ // Common event fields
+ // ----------------------------------
+
+ /**
+ * A device-specific classification provided as part of the event.
+ */
+ public static final String COMMON_CATEGORY = "category";
+ /**
+ * A device-specific classification provided as part of the event.
+ */
+ public static final String COMMON_COUNT = "count";
+ /**
+ * The free-form description of a particular event.
+ */
+ public static final String COMMON_DESC = "desc";
+ /**
+ * The name of a given DHCP pool on a DHCP server.
+ */
+ public static final String COMMON_DHCP_POOL = "dhcp_pool";
+ /**
+ * The amount of time the event lasted.
+ */
+ public static final String COMMON_DURATION = "duration";
+ /**
+ * The fully qualified domain name of the device transmitting or recording
+ * the log record.
+ */
+ public static final String COMMON_DVC_HOST = "dvc_host";
+ /**
+ * The IPv4 address of the device reporting the event.
+ */
+ public static final String COMMON_DVC_IP = "dvc_ip";
+ /**
+ * The IPv6 address of the device reporting the event.
+ */
+ public static final String COMMON_DVC_IP6 = "dvc_ip6";
+ /**
+ * The free-form description of the device's physical location.
+ */
+ public static final String COMMON_DVC_LOCATION = "dvc_location";
+ /**
+ * The MAC (layer 2) address of the device reporting the event.
+ */
+ public static final String COMMON_DVC_MAC = "dvc_mac";
+ /**
+ * The Windows NT domain of the device recording or transmitting the event.
+ */
+ public static final String COMMON_DVC_NT_DOMAIN = "dvc_nt_domain";
+ /**
+ * The Windows NT host name of the device recording or transmitting the
+ * event.
+ */
+ public static final String COMMON_DVC_NT_HOST = "dvc_nt_host";
+ /**
+ * Time at which the device recorded the event.
+ */
+ public static final String COMMON_DVC_TIME = "dvc_time";
+ /**
+ * The event's specified end time.
+ */
+ public static final String COMMON_END_TIME = "end_time";
+ /**
+ * A unique identifier that identifies the event. This is unique to the
+ * reporting device.
+ */
+ public static final String COMMON_EVENT_ID = "event_id";
+ /**
+ * The length of the datagram, event, message, or packet.
+ */
+ public static final String COMMON_LENGTH = "length";
+ /**
+ * The log-level that was set on the device and recorded in the event.
+ */
+ public static final String COMMON_LOG_LEVEL = "log_level";
+ /**
+ * The name of the event as reported by the device. The name should not
+ * contain information that's already being parsed into other fields from
+ * the event, such as IP addresses.
+ */
+ public static final String COMMON_NAME = "name";
+ /**
+ * An integer assigned by the device operating system to the process
+ * creating the record.
+ */
+ public static final String COMMON_PID = "pid";
+ /**
+ * An environment-specific assessment of the event's importance, based on
+ * elements such as event severity, business function of the affected
+ * system, or other locally defined variables.
+ */
+ public static final String COMMON_PRIORITY = "priority";
+ /**
+ * The product that generated the event.
+ */
+ public static final String COMMON_PRODUCT = "product";
+ /**
+ * The version of the product that generated the event.
+ */
+ public static final String COMMON_PRODUCT_VERSION = "product_version";
+ /**
+ * The result root cause, such as connection refused, timeout, crash, and so
+ * on.
+ */
+ public static final String COMMON_REASON = "reason";
+ /**
+ * The action result. Often is a binary choice: succeeded and failed,
+ * allowed and denied, and so on.
+ */
+ public static final String COMMON_RESULT = "result";
+ /**
+ * The severity (or priority) of an event as reported by the originating
+ * device.
+ */
+ public static final String COMMON_SEVERITY = "severity";
+ /**
+ * The event's specified start time.
+ */
+ public static final String COMMON_START_TIME = "start_time";
+ /**
+ * The transaction identifier.
+ */
+ public static final String COMMON_TRANSACTION_ID = "transaction_id";
+ /**
+ * A uniform record locator (a web address, in other words) included in a
+ * record.
+ */
+ public static final String COMMON_URL = "url";
+ /**
+ * The vendor who made the product that generated the event.
+ */
+ public static final String COMMON_VENDOR = "vendor";
+
+ // ----------------------------------
+ // Update
+ // ----------------------------------
+
+ /**
+ * The name of the installed update.
+ */
+ public static final String UPDATE_PACKAGE = "package";
+
+ /**
+ * default key value delimiter
+ */
+ protected static final String KVDELIM = "=";
+ /**
+ * default pair delimiter
+ */
+ protected static final String PAIRDELIM = " ";
+ /**
+ * default quote char
+ */
+ protected static final char QUOTE = '"';
+ /**
+ * default date format is using internal generated date
+ */
+ protected static final String DATEFORMATPATTERN = "yyyy-MM-dd\tHH:mm:ss:SSSZ";
+ /**
+ * Date Formatter
+ */
+ protected static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATEFORMATPATTERN);
+
+ /**
+ * Event prefix fields
+ */
+ protected static final String PREFIX_NAME = "name";
+ protected static final String PREFIX_EVENT_ID = "event_id";
+
+ /**
+ * Java Throwable type fields
+ */
+ protected static final String THROWABLE_CLASS = "throwable_class";
+ protected static final String THROWABLE_MESSAGE = "throwable_message";
+ protected static final String THROWABLE_STACKTRACE_ELEMENTS = "stacktrace_elements";
+
+ protected static final String LINEBREAK = "\n";
+
+ /**
+ * Whether or not to put quotes around values
+ */
+ protected boolean quoteValues = true;
+
+ /**
+ * Whether or not to add a date to the event string
+ */
+ protected boolean useInternalDate = true;
+
+ /**
+ * Contents of the event message
+ */
+ private StringBuffer eventMessage;
+
+ /**
+ * A Constructor to load data from a Map
+ *
+ * @param data the map
+ */
+ public SplunkEvent(Map<String, String> data) {
+ this.eventMessage = new StringBuffer();
+ for (String key : data.keySet()) {
+ this.addPair(key, data.get(key));
+ }
+ }
+
+ /**
+ * A Copy constructor
+ */
+ public SplunkEvent(SplunkEvent splunkEvent) {
+ this.eventMessage = splunkEvent.eventMessage;
+ this.quoteValues = splunkEvent.quoteValues;
+ this.useInternalDate = splunkEvent.useInternalDate;
+ }
+
+ /**
+ * Constructor to create a generic event
+ *
+ * @param eventName the event name
+ * @param eventID the event id
+ * @param useInternalDate whether or not to add a date to the event string
+ * @param quoteValues whether or not to put quotes around values
+ */
+ public SplunkEvent(String eventName, String eventID, boolean useInternalDate, boolean quoteValues) {
+
+ this.eventMessage = new StringBuffer();
+ this.quoteValues = quoteValues;
+ this.useInternalDate = useInternalDate;
+
+ addPair(PREFIX_NAME, eventName);
+ addPair(PREFIX_EVENT_ID, eventID);
+ }
+
+ /**
+ * Constructor to create a generic event with the default format
+ *
+ * @param eventName the event name
+ * @param eventID the event ID
+ */
+ public SplunkEvent(String eventName, String eventID) {
+ this(eventName, eventID, true, true);
+ }
+
+ /**
+ * Default constructor
+ */
+ public SplunkEvent() {
+ this.eventMessage = new StringBuffer();
+ }
+
+ public Map<String, String> getEventData() {
+ Map<String, String> eventData = new HashMap<String, String>();
+ String eventEntries = eventMessage.toString();
+
+ String[] entries = eventEntries.split(PAIRDELIM);
+
+ String quote = new String(new char[]{QUOTE});
+
+ for (String entry : entries) {
+ String[] pair = entry.split(KVDELIM);
+
+ if (pair.length != 2) {
+ throw new UnsupportedOperationException(String.format("invalid event data [%s]", entry));
+ }
+
+ String key = pair[0].replaceAll(quote, "");
+ String value = pair[1].replaceAll(quote, "");
+ if ("null".equals(value)) {
+ value = null;
+ }
+
+ eventData.put(key, value);
+ }
+ return eventData;
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, char value) {
+ addPair(key, String.valueOf(value));
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, boolean value) {
+ addPair(key, String.valueOf(value));
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, double value) {
+ addPair(key, String.valueOf(value));
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, long value) {
+ addPair(key, String.valueOf(value));
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, int value) {
+ addPair(key, String.valueOf(value));
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, Object value) {
+ addPair(key, value.toString());
+ }
+
+ /**
+ * Utility method for formatting Throwable,Error,Exception objects in a more
+ * linear and Splunk friendly manner than printStackTrace
+ *
+ * @param throwable the Throwable object to add to the event
+ */
+ public void addThrowable(Throwable throwable) {
+ addThrowableObject(throwable, -1);
+ }
+
+ /**
+ * Utility method for formatting Throwable,Error,Exception objects in a more
+ * linear and Splunk friendly manner than printStackTrace
+ *
+ * @param throwable the Throwable object to add to the event
+ * @param stackTraceDepth maximum number of stacktrace elements to log
+ */
+ public void addThrowable(Throwable throwable, int stackTraceDepth) {
+ addThrowableObject(throwable, stackTraceDepth);
+ }
+
+ /**
+ * Internal private method for formatting Throwable,Error,Exception objects
+ * in a more linear and Splunk friendly manner than printStackTrace
+ *
+ * @param throwable the Throwable object to add to the event
+ * @param stackTraceDepth maximum number of stacktrace elements to log, -1
+ * for all
+ */
+
+ private void addThrowableObject(Throwable throwable, int stackTraceDepth) {
+ addPair(THROWABLE_CLASS, throwable.getClass().getCanonicalName());
+ addPair(THROWABLE_MESSAGE, throwable.getMessage());
+ StackTraceElement[] elements = throwable.getStackTrace();
+ StringBuilder sb = new StringBuilder();
+ int depth = 0;
+ for (StackTraceElement element : elements) {
+ depth++;
+ if (stackTraceDepth == -1 || stackTraceDepth >= depth) {
+ sb.append(element.toString()).append(",");
+ } else {
+ break;
+ }
+ }
+ addPair(THROWABLE_STACKTRACE_ELEMENTS, sb.toString());
+ }
+
+ /**
+ * Add a key value pair
+ */
+ public void addPair(String key, String value) {
+ if (quoteValues) {
+ this.eventMessage.append(key).append(KVDELIM).append(QUOTE).append(value).append(QUOTE).append(PAIRDELIM);
+ } else {
+ this.eventMessage.append(key).append(KVDELIM).append(value).append(PAIRDELIM);
+ }
+ }
+
+ /**
+ * return the completed event message
+ */
+ @Override
+ public String toString() {
+ String event = "";
+
+ if (useInternalDate) {
+ StringBuilder clonedMessage = new StringBuilder();
+ clonedMessage.append(DATE_FORMATTER.print(new Date().getTime())).append(PAIRDELIM).append(this.eventMessage);
+ event = clonedMessage.toString();
+ } else {
+ event = eventMessage.toString();
+ }
+
+ // trim off trailing pair delim char(s)
+ String result = event.substring(0, event.length() - PAIRDELIM.length()) + LINEBREAK;
+ return result;
+ }
+
+ public void setCommonCategory(String commonCategory) {
+ addPair(COMMON_CATEGORY, commonCategory);
+ }
+
+ public void setCommonCount(String commonCount) {
+ addPair(COMMON_COUNT, commonCount);
+ }
+
+ public void setCommonDesc(String commonDesc) {
+ addPair(COMMON_DESC, commonDesc);
+ }
+
+ public void setCommonDhcpPool(String commonDhcpPool) {
+ addPair(COMMON_DHCP_POOL, commonDhcpPool);
+ }
+
+ public void setCommonDuration(long commonDuration) {
+ addPair(COMMON_DURATION, commonDuration);
+ }
+
+ public void setCommonDvcHost(String commonDvcHost) {
+ addPair(COMMON_DVC_HOST, commonDvcHost);
+ }
+
+ public void setCommonDvcIp(String commonDvcIp) {
+ addPair(COMMON_DVC_IP, commonDvcIp);
+ }
+
+ public void setCommonDvcIp6(String commonDvcIp6) {
+ addPair(COMMON_DVC_IP6, commonDvcIp6);
+ }
+
+ public void setCommonDvcLocation(String commonDvcLocation) {
+ addPair(COMMON_DVC_LOCATION, commonDvcLocation);
+ }
+
+ public void setCommonDvcMac(String commonDvcMac) {
+ addPair(COMMON_DVC_MAC, commonDvcMac);
+ }
+
+ public void setCommonDvcNtDomain(String commonDvcNtDomain) {
+ addPair(COMMON_DVC_NT_DOMAIN, commonDvcNtDomain);
+ }
+
+ public void setCommonDvcNtHost(String commonDvcNtHost) {
+ addPair(COMMON_DVC_NT_HOST, commonDvcNtHost);
+ }
+
+ public void setCommonDvcTime(long commonDvcTime) {
+ addPair(COMMON_DVC_TIME, commonDvcTime);
+ }
+
+ public void setCommonEndTime(long commonEndTime) {
+ addPair(COMMON_END_TIME, commonEndTime);
+ }
+
+ public void setCommonEventId(long commonEventId) {
+ addPair(COMMON_EVENT_ID, commonEventId);
+ }
+
+ public void setCommonLength(long commonLength) {
+ addPair(COMMON_LENGTH, commonLength);
+ }
+
+ public void setCommonLogLevel(String commonLogLevel) {
+ addPair(COMMON_LOG_LEVEL, commonLogLevel);
+ }
+
+ public void setCommonName(String commonName) {
+ addPair(COMMON_NAME, commonName);
+ }
+
+ public void setCommonPid(long commonPid) {
+ addPair(COMMON_PID, commonPid);
+ }
+
+ public void setCommonPriority(long commonPriority) {
+ addPair(COMMON_PRIORITY, commonPriority);
+ }
+
+ public void setCommonProduct(String commonProduct) {
+ addPair(COMMON_PRODUCT, commonProduct);
+ }
+
+ public void setCommonProductVersion(long commonProductVersion) {
+ addPair(COMMON_PRODUCT_VERSION, commonProductVersion);
+ }
+
+ public void setCommonReason(String commonReason) {
+ addPair(COMMON_REASON, commonReason);
+ }
+
+ public void setCommonResult(String commonResult) {
+ addPair(COMMON_RESULT, commonResult);
+ }
+
+ public void setCommonSeverity(String commonSeverity) {
+ addPair(COMMON_SEVERITY, commonSeverity);
+ }
+
+ public void setCommonStartTime(long commonStartTime) {
+ addPair(COMMON_START_TIME, commonStartTime);
+ }
+
+ public void setCommonTransactionId(String commonTransactionId) {
+ addPair(COMMON_TRANSACTION_ID, commonTransactionId);
+ }
+
+ public void setCommonUrl(String commonUrl) {
+ addPair(COMMON_URL, commonUrl);
+ }
+
+ public void setCommonVendor(String commonVendor) {
+ addPair(COMMON_VENDOR, commonVendor);
+ }
+
+ public void setUpdatePackage(String updatePackage) {
+ addPair(UPDATE_PACKAGE, updatePackage);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
new file mode 100644
index 0000000..49cccca
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import org.apache.camel.component.splunk.event.SplunkEvent;
+
+public interface DataWriter {
+ void write(SplunkEvent data) throws Exception;
+
+ void stop();
+
+ void start();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
new file mode 100644
index 0000000..f3a0afd
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.splunk.Args;
+import com.splunk.Job;
+import com.splunk.ResultsReader;
+import com.splunk.ResultsReaderJson;
+import com.splunk.SavedSearch;
+import com.splunk.SavedSearchCollection;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.ConsumerType;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplunkDataReader {
+ private static final Logger LOG = LoggerFactory.getLogger(SplunkDataReader.class);
+ private static final String DATE_FORMAT = "MM/dd/yy HH:mm:ss:SSS";
+ private static final String SPLUNK_TIME_FORMAT = "%m/%d/%y %H:%M:%S:%3N";
+
+ private transient Calendar lastSuccessfulReadTime;
+ private SplunkEndpoint endpoint;
+ private ConsumerType consumerType;
+
+ public SplunkDataReader(SplunkEndpoint endpoint, ConsumerType consumerType) {
+ this.endpoint = endpoint;
+ this.consumerType = consumerType;
+ }
+
+ public int getCount() {
+ return endpoint.getConfiguration().getCount();
+ }
+
+ public String getFieldList() {
+ return endpoint.getConfiguration().getFieldList();
+ }
+
+ public String getSearch() {
+ return endpoint.getConfiguration().getSearch();
+ }
+
+ public String getEarliestTime() {
+ return endpoint.getConfiguration().getEarliestTime();
+ }
+
+ public String getLatestTime() {
+ return endpoint.getConfiguration().getLatestTime();
+ }
+
+ public String getInitEarliestTime() {
+ return endpoint.getConfiguration().getInitEarliestTime();
+ }
+
+ private String getSavedSearch() {
+ return endpoint.getConfiguration().getSavedSearch();
+ }
+
+ public List<SplunkEvent> read() throws Exception {
+ switch (consumerType) {
+ case NORMAL: {
+ return nonBlockingSearch();
+ }
+ case REALTIME: {
+ return realtimeSearch();
+ }
+ case SAVEDSEARCH: {
+ return savedSearch();
+ }
+ default: {
+ throw new RuntimeException("Unknown search mode " + consumerType);
+ }
+ }
+ }
+
+ /**
+ * Get the earliestTime of range search.
+ *
+ * @param startTime the time where search start
+ * @param realtime if this is realtime search
+ * @return The time of last successful read if not realtime; Time difference
+ * between last successful read and start time;
+ */
+ private String calculateEarliestTime(Calendar startTime, boolean realtime) {
+ String result;
+ if (realtime) {
+ result = calculateEarliestTimeForRealTime(startTime);
+ } else {
+ DateFormat df = new SimpleDateFormat(DATE_FORMAT);
+ result = df.format(lastSuccessfulReadTime.getTime());
+ }
+ return result;
+ }
+
+ /**
+ * Gets earliest time for realtime search
+ */
+ private String calculateEarliestTimeForRealTime(Calendar startTime) {
+ String result;
+ long diff = startTime.getTimeInMillis() - lastSuccessfulReadTime.getTimeInMillis();
+ result = "-" + diff / 1000 + "s";
+ return result;
+ }
+
+ private void populateArgs(Args queryArgs, Calendar startTime, boolean realtime) {
+ String earliestTime = getEarliestTime(startTime, realtime);
+ if (ObjectHelper.isNotEmpty(earliestTime)) {
+ queryArgs.put("earliest_time", earliestTime);
+ }
+
+ String latestTime = getLatestTime(startTime, realtime);
+ if (ObjectHelper.isNotEmpty(latestTime)) {
+ queryArgs.put("latest_time", latestTime);
+ }
+
+ queryArgs.put("time_format", SPLUNK_TIME_FORMAT);
+
+ if (ObjectHelper.isNotEmpty(getFieldList())) {
+ queryArgs.put("field_list", getFieldList());
+ }
+ }
+
+ private String getLatestTime(Calendar startTime, boolean realtime) {
+ String lTime;
+ if (ObjectHelper.isNotEmpty(getLatestTime())) {
+ lTime = getLatestTime();
+ } else {
+ if (realtime) {
+ lTime = "rt";
+ } else {
+ DateFormat df = new SimpleDateFormat(DATE_FORMAT);
+ lTime = df.format(startTime.getTime());
+ }
+ }
+ return lTime;
+ }
+
+ private String getEarliestTime(Calendar startTime, boolean realtime) {
+ String eTime = null;
+
+ if (lastSuccessfulReadTime == null) {
+ eTime = getInitEarliestTime();
+ } else {
+ if (ObjectHelper.isNotEmpty(getEarliestTime())) {
+ eTime = getEarliestTime();
+ } else {
+ String calculatedEarliestTime = calculateEarliestTime(startTime, realtime);
+ if (calculatedEarliestTime != null) {
+ if (realtime) {
+ eTime = "rt" + calculatedEarliestTime;
+ } else {
+ eTime = calculatedEarliestTime;
+ }
+ }
+ }
+ }
+ return eTime;
+ }
+
+ private List<SplunkEvent> savedSearch() throws Exception {
+ LOG.trace("saved search start");
+
+ Args queryArgs = new Args();
+ queryArgs.put("app", "search");
+ if (ObjectHelper.isNotEmpty(endpoint.getConfiguration().getOwner())) {
+ queryArgs.put("owner", endpoint.getConfiguration().getOwner());
+ }
+ if (ObjectHelper.isNotEmpty(endpoint.getConfiguration().getApp())) {
+ queryArgs.put("app", endpoint.getConfiguration().getApp());
+ }
+
+ Calendar startTime = Calendar.getInstance();
+
+ SavedSearch search = null;
+ Job job = null;
+ String latestTime = getLatestTime(startTime, false);
+ String earliestTime = getEarliestTime(startTime, false);
+
+ Service service = endpoint.getService();
+ SavedSearchCollection savedSearches = service.getSavedSearches(queryArgs);
+ for (SavedSearch s : savedSearches.values()) {
+ if (s.getName().equals(getSavedSearch())) {
+ search = s;
+ break;
+ }
+ }
+ if (search != null) {
+ Map<String, String> args = new HashMap<String, String>();
+ args.put("force_dispatch", "true");
+ args.put("dispatch.earliest_time", earliestTime);
+ args.put("dispatch.latest_time", latestTime);
+ job = search.dispatch(args);
+ }
+ while (!job.isDone()) {
+ Thread.sleep(2000);
+ }
+ List<SplunkEvent> data = extractData(job, false);
+ this.lastSuccessfulReadTime = startTime;
+ return data;
+
+ }
+
+ private List<SplunkEvent> nonBlockingSearch() throws Exception {
+ LOG.debug("non block search start");
+
+ Args queryArgs = new Args();
+ queryArgs.put("exec_mode", "normal");
+ Calendar startTime = Calendar.getInstance();
+ populateArgs(queryArgs, startTime, false);
+
+ List<SplunkEvent> data = runQuery(queryArgs, false);
+ lastSuccessfulReadTime = startTime;
+ return data;
+ }
+
+ private List<SplunkEvent> realtimeSearch() throws Exception {
+ LOG.trace("realtime search start");
+
+ Args queryArgs = new Args();
+ // queryArgs.put("exec_mode", "normal");
+ queryArgs.put("search_mode", "realtime");
+ Calendar startTime = Calendar.getInstance();
+ populateArgs(queryArgs, startTime, true);
+
+ List<SplunkEvent> data = runQuery(queryArgs, true);
+ lastSuccessfulReadTime = startTime;
+ return data;
+ }
+
+ private List<SplunkEvent> runQuery(Args queryArgs, boolean realtime) throws Exception {
+ Service service = endpoint.getService();
+ Job job = service.getJobs().create(getSearch(), queryArgs);
+ if (realtime) {
+ while (!job.isReady()) {
+ Thread.sleep(2000);
+ }
+ } else {
+ while (!job.isDone()) {
+ Thread.sleep(2000);
+ }
+ }
+ return extractData(job, realtime);
+ }
+
+ private List<SplunkEvent> extractData(Job job, boolean realtime) throws Exception {
+ List<SplunkEvent> result = new ArrayList<SplunkEvent>();
+ HashMap<String, String> data;
+ SplunkEvent splunkData;
+ ResultsReader resultsReader = null;
+ int total = 0;
+ if (realtime) {
+ // total = job.getResultPreviewCount();
+ } else {
+ total = job.getResultCount();
+ }
+ if (getCount() == 0 || total < getCount()) {
+ InputStream stream = null;
+ Args outputArgs = new Args();
+ outputArgs.put("output_mode", "json");
+ if (realtime) {
+ stream = job.getResultsPreview(outputArgs);
+ } else {
+ stream = job.getResults(outputArgs);
+ }
+
+ resultsReader = new ResultsReaderJson(stream);
+ while ((data = resultsReader.getNextEvent()) != null) {
+ splunkData = new SplunkEvent(data);
+ result.add(splunkData);
+ }
+ IOHelper.close(stream);
+ } else {
+ int offset = 0;
+ while (offset < total) {
+ InputStream stream;
+ Args outputArgs = new Args();
+ outputArgs.put("output_mode", "json");
+ outputArgs.put("count", getCount());
+ outputArgs.put("offset", offset);
+ if (realtime) {
+ stream = job.getResultsPreview(outputArgs);
+ } else {
+ stream = job.getResults(outputArgs);
+ }
+ resultsReader = new ResultsReaderJson(stream);
+ while ((data = resultsReader.getNextEvent()) != null) {
+ splunkData = new SplunkEvent(data);
+ result.add(splunkData);
+ }
+ offset += getCount();
+ IOHelper.close(stream);
+ }
+ }
+ if (resultsReader != null) {
+ resultsReader.close();
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
new file mode 100644
index 0000000..46dba18
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class SplunkDataWriter implements DataWriter {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected Socket socket;
+ protected SplunkEndpoint endpoint;
+ protected Args args;
+
+ public SplunkDataWriter(SplunkEndpoint endpoint, Args args) {
+ this.endpoint = endpoint;
+ this.args = args;
+ }
+
+ protected abstract Socket createSocket(Service service) throws IOException;
+
+ public void write(SplunkEvent event) throws Exception {
+ logger.debug("writing event to splunk:" + event);
+ doWrite(event, socket);
+ }
+
+ protected void doWrite(SplunkEvent event, Socket socket) throws IOException {
+ OutputStream ostream = socket.getOutputStream();
+ Writer writer = new OutputStreamWriter(ostream, "UTF8");
+ writer.write(event.toString());
+ writer.flush();
+ }
+
+ public Args getArgs() {
+ return args;
+ }
+
+ @Override
+ public synchronized void start() {
+ try {
+ socket = createSocket(endpoint.getService());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
new file mode 100644
index 0000000..ffc4102
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/StreamDataWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.Receiver;
+import com.splunk.Service;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+
+public class StreamDataWriter extends SplunkDataWriter {
+ private String index;
+
+ public StreamDataWriter(SplunkEndpoint endpoint, Args args) {
+ super(endpoint, args);
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ @Override
+ protected Socket createSocket(Service service) throws IOException {
+ Index indexObject = null;
+ Receiver receiver = null;
+ Socket socket = null;
+
+ if (index != null) {
+ indexObject = service.getIndexes().get(index);
+ if (indexObject == null) {
+ throw new RuntimeCamelException(String.format("cannot find index [%s]", index));
+ }
+ socket = indexObject.attach(args);
+ } else {
+ receiver = service.getReceiver();
+ socket = receiver.attach(args);
+ }
+ socket.setTcpNoDelay(true);
+ logger.trace(String.format("created a socket on %s", socket.getRemoteSocketAddress()));
+ return socket;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
new file mode 100644
index 0000000..1a96569
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.Receiver;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+import org.apache.camel.component.splunk.event.SplunkEvent;
+
+public class SubmitDataWriter extends SplunkDataWriter {
+ private String index;
+
+ public SubmitDataWriter(SplunkEndpoint endpoint, Args args) {
+ super(endpoint, args);
+ }
+
+ protected void doWrite(SplunkEvent event, Socket socket) throws IOException {
+ Index index = getIndex();
+ if (index != null) {
+ index.submit(args, event.toString());
+ } else {
+ Receiver receiver = endpoint.getService().getReceiver();
+ receiver.submit(args, event.toString());
+ }
+ }
+
+ @Override
+ protected Socket createSocket(Service service) throws IOException {
+ return null;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ private Index getIndex() {
+ return (index == null) ? null : endpoint.getService().getIndexes().get(index);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e47197b9/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
new file mode 100644
index 0000000..511d39b
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/TcpDataWriter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import com.splunk.Args;
+import com.splunk.Input;
+import com.splunk.Service;
+import org.apache.camel.component.splunk.SplunkEndpoint;
+
+public class TcpDataWriter extends SplunkDataWriter {
+ private int port;
+
+ public TcpDataWriter(SplunkEndpoint endpoint, Args args) {
+ super(endpoint, args);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ protected Socket createSocket(Service service) throws IOException {
+ Input input = service.getInputs().get(String.valueOf(port));
+ if (input == null) {
+ throw new RuntimeException("no input defined for port " + port);
+ }
+ if (input.isDisabled()) {
+ throw new RuntimeException(String.format("input on port %d is disabled", port));
+ }
+ Socket socket = service.open(port);
+ return socket;
+ }
+}